aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-02-03 00:29:04 -0800
committerReynold Xin <rxin@databricks.com>2015-02-03 00:29:04 -0800
commit37df330135e6a3e62c580e5706eade5f1eaf5b13 (patch)
treef7ddd7cd55b0e621ea6facdfdbf06a11b5cc955b /sql
parent659329f9ee51ca8ae6232e07c45b5d9144d49667 (diff)
downloadspark-37df330135e6a3e62c580e5706eade5f1eaf5b13.tar.gz
spark-37df330135e6a3e62c580e5706eade5f1eaf5b13.tar.bz2
spark-37df330135e6a3e62c580e5706eade5f1eaf5b13.zip
[SQL][DataFrame] Remove DataFrameApi, ExpressionApi, and GroupedDataFrameApi
They were there mostly for code review and easier check of the API. I don't think they need to be there anymore. Author: Reynold Xin <rxin@databricks.com> Closes #4328 from rxin/remove-df-api and squashes the following commits: 723d600 [Reynold Xin] [SQL][DataFrame] Remove DataFrameApi and ColumnApi.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Column.scala118
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala110
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala63
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api.scala326
6 files changed, 200 insertions, 444 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 6f48d7c3fe..0d6055ff23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -56,7 +56,7 @@ private[sql] object Column {
*
*/
// TODO: Improve documentation.
-trait Column extends DataFrame with ExpressionApi {
+trait Column extends DataFrame {
protected[sql] def expr: Expression
@@ -101,7 +101,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( -df("amount") )
* }}}
*/
- override def unary_- : Column = constructColumn(null) { UnaryMinus(expr) }
+ def unary_- : Column = constructColumn(null) { UnaryMinus(expr) }
/**
* Bitwise NOT.
@@ -110,7 +110,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( ~df("flags") )
* }}}
*/
- override def unary_~ : Column = constructColumn(null) { BitwiseNot(expr) }
+ def unary_~ : Column = constructColumn(null) { BitwiseNot(expr) }
/**
* Inversion of boolean expression, i.e. NOT.
@@ -119,7 +119,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( !df("isActive") )
* }}
*/
- override def unary_! : Column = constructColumn(null) { Not(expr) }
+ def unary_! : Column = constructColumn(null) { Not(expr) }
/**
@@ -130,7 +130,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( df("colA".equalTo(df("colB")) )
* }}}
*/
- override def === (other: Column): Column = constructColumn(other) {
+ def === (other: Column): Column = constructColumn(other) {
EqualTo(expr, other.expr)
}
@@ -142,7 +142,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( df("colA".equalTo("Zaharia") )
* }}}
*/
- override def === (literal: Any): Column = this === lit(literal)
+ def === (literal: Any): Column = this === lit(literal)
/**
* Equality test with an expression.
@@ -152,7 +152,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( df("colA".equalTo(df("colB")) )
* }}}
*/
- override def equalTo(other: Column): Column = this === other
+ def equalTo(other: Column): Column = this === other
/**
* Equality test with a literal value.
@@ -162,7 +162,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( df("colA".equalTo("Zaharia") )
* }}}
*/
- override def equalTo(literal: Any): Column = this === literal
+ def equalTo(literal: Any): Column = this === literal
/**
* Inequality test with an expression.
@@ -172,7 +172,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( !(df("colA") === df("colB")) )
* }}}
*/
- override def !== (other: Column): Column = constructColumn(other) {
+ def !== (other: Column): Column = constructColumn(other) {
Not(EqualTo(expr, other.expr))
}
@@ -184,7 +184,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select( !(df("colA") === 15) )
* }}}
*/
- override def !== (literal: Any): Column = this !== lit(literal)
+ def !== (literal: Any): Column = this !== lit(literal)
/**
* Greater than an expression.
@@ -193,7 +193,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("age") > Literal(21) )
* }}}
*/
- override def > (other: Column): Column = constructColumn(other) {
+ def > (other: Column): Column = constructColumn(other) {
GreaterThan(expr, other.expr)
}
@@ -204,7 +204,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("age") > 21 )
* }}}
*/
- override def > (literal: Any): Column = this > lit(literal)
+ def > (literal: Any): Column = this > lit(literal)
/**
* Less than an expression.
@@ -213,7 +213,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("age") < Literal(21) )
* }}}
*/
- override def < (other: Column): Column = constructColumn(other) {
+ def < (other: Column): Column = constructColumn(other) {
LessThan(expr, other.expr)
}
@@ -224,7 +224,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("age") < 21 )
* }}}
*/
- override def < (literal: Any): Column = this < lit(literal)
+ def < (literal: Any): Column = this < lit(literal)
/**
* Less than or equal to an expression.
@@ -233,7 +233,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("age") <= Literal(21) )
* }}}
*/
- override def <= (other: Column): Column = constructColumn(other) {
+ def <= (other: Column): Column = constructColumn(other) {
LessThanOrEqual(expr, other.expr)
}
@@ -244,7 +244,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("age") <= 21 )
* }}}
*/
- override def <= (literal: Any): Column = this <= lit(literal)
+ def <= (literal: Any): Column = this <= lit(literal)
/**
* Greater than or equal to an expression.
@@ -253,7 +253,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("age") >= Literal(21) )
* }}}
*/
- override def >= (other: Column): Column = constructColumn(other) {
+ def >= (other: Column): Column = constructColumn(other) {
GreaterThanOrEqual(expr, other.expr)
}
@@ -264,12 +264,12 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("age") >= 21 )
* }}}
*/
- override def >= (literal: Any): Column = this >= lit(literal)
+ def >= (literal: Any): Column = this >= lit(literal)
/**
* Equality test with an expression that is safe for null values.
*/
- override def <=> (other: Column): Column = constructColumn(other) {
+ def <=> (other: Column): Column = constructColumn(other) {
other match {
case null => EqualNullSafe(expr, lit(null).expr)
case _ => EqualNullSafe(expr, other.expr)
@@ -279,17 +279,17 @@ trait Column extends DataFrame with ExpressionApi {
/**
* Equality test with a literal value that is safe for null values.
*/
- override def <=> (literal: Any): Column = this <=> lit(literal)
+ def <=> (literal: Any): Column = this <=> lit(literal)
/**
* True if the current expression is null.
*/
- override def isNull: Column = constructColumn(null) { IsNull(expr) }
+ def isNull: Column = constructColumn(null) { IsNull(expr) }
/**
* True if the current expression is NOT null.
*/
- override def isNotNull: Column = constructColumn(null) { IsNotNull(expr) }
+ def isNotNull: Column = constructColumn(null) { IsNotNull(expr) }
/**
* Boolean OR with an expression.
@@ -298,7 +298,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("inSchool") || people("isEmployed") )
* }}}
*/
- override def || (other: Column): Column = constructColumn(other) {
+ def || (other: Column): Column = constructColumn(other) {
Or(expr, other.expr)
}
@@ -309,7 +309,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("inSchool") || true )
* }}}
*/
- override def || (literal: Boolean): Column = this || lit(literal)
+ def || (literal: Boolean): Column = this || lit(literal)
/**
* Boolean AND with an expression.
@@ -318,7 +318,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("inSchool") && people("isEmployed") )
* }}}
*/
- override def && (other: Column): Column = constructColumn(other) {
+ def && (other: Column): Column = constructColumn(other) {
And(expr, other.expr)
}
@@ -329,43 +329,43 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("inSchool") && true )
* }}}
*/
- override def && (literal: Boolean): Column = this && lit(literal)
+ def && (literal: Boolean): Column = this && lit(literal)
/**
* Bitwise AND with an expression.
*/
- override def & (other: Column): Column = constructColumn(other) {
+ def & (other: Column): Column = constructColumn(other) {
BitwiseAnd(expr, other.expr)
}
/**
* Bitwise AND with a literal value.
*/
- override def & (literal: Any): Column = this & lit(literal)
+ def & (literal: Any): Column = this & lit(literal)
/**
* Bitwise OR with an expression.
*/
- override def | (other: Column): Column = constructColumn(other) {
+ def | (other: Column): Column = constructColumn(other) {
BitwiseOr(expr, other.expr)
}
/**
* Bitwise OR with a literal value.
*/
- override def | (literal: Any): Column = this | lit(literal)
+ def | (literal: Any): Column = this | lit(literal)
/**
* Bitwise XOR with an expression.
*/
- override def ^ (other: Column): Column = constructColumn(other) {
+ def ^ (other: Column): Column = constructColumn(other) {
BitwiseXor(expr, other.expr)
}
/**
* Bitwise XOR with a literal value.
*/
- override def ^ (literal: Any): Column = this ^ lit(literal)
+ def ^ (literal: Any): Column = this ^ lit(literal)
/**
* Sum of this expression and another expression.
@@ -374,7 +374,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("height") + people("weight") )
* }}}
*/
- override def + (other: Column): Column = constructColumn(other) {
+ def + (other: Column): Column = constructColumn(other) {
Add(expr, other.expr)
}
@@ -385,7 +385,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("height") + 10 )
* }}}
*/
- override def + (literal: Any): Column = this + lit(literal)
+ def + (literal: Any): Column = this + lit(literal)
/**
* Subtraction. Subtract the other expression from this expression.
@@ -394,7 +394,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("height") - people("weight") )
* }}}
*/
- override def - (other: Column): Column = constructColumn(other) {
+ def - (other: Column): Column = constructColumn(other) {
Subtract(expr, other.expr)
}
@@ -405,7 +405,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("height") - 10 )
* }}}
*/
- override def - (literal: Any): Column = this - lit(literal)
+ def - (literal: Any): Column = this - lit(literal)
/**
* Multiplication of this expression and another expression.
@@ -414,7 +414,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("height") * people("weight") )
* }}}
*/
- override def * (other: Column): Column = constructColumn(other) {
+ def * (other: Column): Column = constructColumn(other) {
Multiply(expr, other.expr)
}
@@ -425,7 +425,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("height") * 10 )
* }}}
*/
- override def * (literal: Any): Column = this * lit(literal)
+ def * (literal: Any): Column = this * lit(literal)
/**
* Division this expression by another expression.
@@ -434,7 +434,7 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("height") / people("weight") )
* }}}
*/
- override def / (other: Column): Column = constructColumn(other) {
+ def / (other: Column): Column = constructColumn(other) {
Divide(expr, other.expr)
}
@@ -445,19 +445,19 @@ trait Column extends DataFrame with ExpressionApi {
* people.select( people("height") / 10 )
* }}}
*/
- override def / (literal: Any): Column = this / lit(literal)
+ def / (literal: Any): Column = this / lit(literal)
/**
* Modulo (a.k.a. remainder) expression.
*/
- override def % (other: Column): Column = constructColumn(other) {
+ def % (other: Column): Column = constructColumn(other) {
Remainder(expr, other.expr)
}
/**
* Modulo (a.k.a. remainder) expression.
*/
- override def % (literal: Any): Column = this % lit(literal)
+ def % (literal: Any): Column = this % lit(literal)
/**
@@ -465,29 +465,29 @@ trait Column extends DataFrame with ExpressionApi {
* by the evaluated values of the arguments.
*/
@scala.annotation.varargs
- override def in(list: Column*): Column = {
+ def in(list: Column*): Column = {
new IncomputableColumn(In(expr, list.map(_.expr)))
}
- override def like(literal: String): Column = constructColumn(null) {
+ def like(literal: String): Column = constructColumn(null) {
Like(expr, lit(literal).expr)
}
- override def rlike(literal: String): Column = constructColumn(null) {
+ def rlike(literal: String): Column = constructColumn(null) {
RLike(expr, lit(literal).expr)
}
/**
* An expression that gets an item at position `ordinal` out of an array.
*/
- override def getItem(ordinal: Int): Column = constructColumn(null) {
+ def getItem(ordinal: Int): Column = constructColumn(null) {
GetItem(expr, Literal(ordinal))
}
/**
* An expression that gets a field by name in a [[StructField]].
*/
- override def getField(fieldName: String): Column = constructColumn(null) {
+ def getField(fieldName: String): Column = constructColumn(null) {
GetField(expr, fieldName)
}
@@ -496,7 +496,7 @@ trait Column extends DataFrame with ExpressionApi {
* @param startPos expression for the starting position.
* @param len expression for the length of the substring.
*/
- override def substr(startPos: Column, len: Column): Column = {
+ def substr(startPos: Column, len: Column): Column = {
new IncomputableColumn(Substring(expr, startPos.expr, len.expr))
}
@@ -505,25 +505,25 @@ trait Column extends DataFrame with ExpressionApi {
* @param startPos starting position.
* @param len length of the substring.
*/
- override def substr(startPos: Int, len: Int): Column = this.substr(lit(startPos), lit(len))
+ def substr(startPos: Int, len: Int): Column = this.substr(lit(startPos), lit(len))
- override def contains(other: Column): Column = constructColumn(other) {
+ def contains(other: Column): Column = constructColumn(other) {
Contains(expr, other.expr)
}
- override def contains(literal: Any): Column = this.contains(lit(literal))
+ def contains(literal: Any): Column = this.contains(lit(literal))
- override def startsWith(other: Column): Column = constructColumn(other) {
+ def startsWith(other: Column): Column = constructColumn(other) {
StartsWith(expr, other.expr)
}
- override def startsWith(literal: String): Column = this.startsWith(lit(literal))
+ def startsWith(literal: String): Column = this.startsWith(lit(literal))
- override def endsWith(other: Column): Column = constructColumn(other) {
+ def endsWith(other: Column): Column = constructColumn(other) {
EndsWith(expr, other.expr)
}
- override def endsWith(literal: String): Column = this.endsWith(lit(literal))
+ def endsWith(literal: String): Column = this.endsWith(lit(literal))
/**
* Gives the column an alias.
@@ -545,7 +545,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select(df("colA").cast("int"))
* }}}
*/
- override def cast(to: DataType): Column = constructColumn(null) { Cast(expr, to) }
+ def cast(to: DataType): Column = constructColumn(null) { Cast(expr, to) }
/**
* Casts the column to a different data type, using the canonical string representation
@@ -556,7 +556,7 @@ trait Column extends DataFrame with ExpressionApi {
* df.select(df("colA").cast("int"))
* }}}
*/
- override def cast(to: String): Column = constructColumn(null) {
+ def cast(to: String): Column = constructColumn(null) {
Cast(expr, to.toLowerCase match {
case "string" => StringType
case "boolean" => BooleanType
@@ -573,9 +573,9 @@ trait Column extends DataFrame with ExpressionApi {
})
}
- override def desc: Column = constructColumn(null) { SortOrder(expr, Descending) }
+ def desc: Column = constructColumn(null) { SortOrder(expr, Descending) }
- override def asc: Column = constructColumn(null) { SortOrder(expr, Ascending) }
+ def asc: Column = constructColumn(null) { SortOrder(expr, Ascending) }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 4cbfb6af5d..5920852e8c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -71,7 +71,7 @@ private[sql] object DataFrame {
* }}}
*/
// TODO: Improve documentation.
-trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
+trait DataFrame extends RDDApi[Row] {
val sqlContext: SQLContext
@@ -80,7 +80,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
protected[sql] def logicalPlan: LogicalPlan
- /** Left here for compatibility reasons. */
+ /** Left here for backward compatibility. */
@deprecated("1.3.0", "use toDataFrame")
def toSchemaRDD: DataFrame = this
@@ -102,16 +102,16 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
def toDataFrame(colName: String, colNames: String*): DataFrame
/** Returns the schema of this [[DataFrame]]. */
- override def schema: StructType
+ def schema: StructType
/** Returns all column names and their data types as an array. */
- override def dtypes: Array[(String, String)]
+ def dtypes: Array[(String, String)]
/** Returns all column names as an array. */
- override def columns: Array[String] = schema.fields.map(_.name)
+ def columns: Array[String] = schema.fields.map(_.name)
/** Prints the schema to the console in a nice tree format. */
- override def printSchema(): Unit
+ def printSchema(): Unit
/**
* Cartesian join with another [[DataFrame]].
@@ -120,7 +120,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
*
* @param right Right side of the join operation.
*/
- override def join(right: DataFrame): DataFrame
+ def join(right: DataFrame): DataFrame
/**
* Inner join with another [[DataFrame]], using the given join expression.
@@ -131,7 +131,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* df1.join(df2).where($"df1Key" === $"df2Key")
* }}}
*/
- override def join(right: DataFrame, joinExprs: Column): DataFrame
+ def join(right: DataFrame, joinExprs: Column): DataFrame
/**
* Join with another [[DataFrame]], usin g the given join expression. The following performs
@@ -145,7 +145,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* @param joinExprs Join expression.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.
*/
- override def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
+ def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
/**
* Returns a new [[DataFrame]] sorted by the specified column, all in ascending order.
@@ -157,7 +157,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* }}}
*/
@scala.annotation.varargs
- override def sort(sortCol: String, sortCols: String*): DataFrame
+ def sort(sortCol: String, sortCols: String*): DataFrame
/**
* Returns a new [[DataFrame]] sorted by the given expressions. For example:
@@ -166,26 +166,26 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* }}}
*/
@scala.annotation.varargs
- override def sort(sortExpr: Column, sortExprs: Column*): DataFrame
+ def sort(sortExpr: Column, sortExprs: Column*): DataFrame
/**
* Returns a new [[DataFrame]] sorted by the given expressions.
* This is an alias of the `sort` function.
*/
@scala.annotation.varargs
- override def orderBy(sortCol: String, sortCols: String*): DataFrame
+ def orderBy(sortCol: String, sortCols: String*): DataFrame
/**
* Returns a new [[DataFrame]] sorted by the given expressions.
* This is an alias of the `sort` function.
*/
@scala.annotation.varargs
- override def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame
+ def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame
/**
* Selects column based on the column name and return it as a [[Column]].
*/
- override def apply(colName: String): Column
+ def apply(colName: String): Column
/**
* Selects a set of expressions, wrapped in a Product.
@@ -195,12 +195,12 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* df.select($"colA", $"colB" + 1)
* }}}
*/
- override def apply(projection: Product): DataFrame
+ def apply(projection: Product): DataFrame
/**
* Returns a new [[DataFrame]] with an alias set.
*/
- override def as(name: String): DataFrame
+ def as(name: String): DataFrame
/**
* Selects a set of expressions.
@@ -209,7 +209,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* }}}
*/
@scala.annotation.varargs
- override def select(cols: Column*): DataFrame
+ def select(cols: Column*): DataFrame
/**
* Selects a set of columns. This is a variant of `select` that can only select
@@ -222,7 +222,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* }}}
*/
@scala.annotation.varargs
- override def select(col: String, cols: String*): DataFrame
+ def select(col: String, cols: String*): DataFrame
/**
* Filters rows using the given condition.
@@ -233,7 +233,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* peopleDf($"age" > 15)
* }}}
*/
- override def filter(condition: Column): DataFrame
+ def filter(condition: Column): DataFrame
/**
* Filters rows using the given condition. This is an alias for `filter`.
@@ -244,7 +244,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* peopleDf($"age" > 15)
* }}}
*/
- override def where(condition: Column): DataFrame
+ def where(condition: Column): DataFrame
/**
* Filters rows using the given condition. This is a shorthand meant for Scala.
@@ -255,7 +255,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* peopleDf($"age" > 15)
* }}}
*/
- override def apply(condition: Column): DataFrame
+ def apply(condition: Column): DataFrame
/**
* Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
@@ -273,7 +273,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* }}}
*/
@scala.annotation.varargs
- override def groupBy(cols: Column*): GroupedDataFrame
+ def groupBy(cols: Column*): GroupedDataFrame
/**
* Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
@@ -294,7 +294,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* }}}
*/
@scala.annotation.varargs
- override def groupBy(col1: String, cols: String*): GroupedDataFrame
+ def groupBy(col1: String, cols: String*): GroupedDataFrame
/**
* Aggregates on the entire [[DataFrame]] without groups.
@@ -304,7 +304,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}
*/
- override def agg(exprs: Map[String, String]): DataFrame
+ def agg(exprs: Map[String, String]): DataFrame
/**
* Aggregates on the entire [[DataFrame]] without groups.
@@ -314,7 +314,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}
*/
- override def agg(exprs: java.util.Map[String, String]): DataFrame
+ def agg(exprs: java.util.Map[String, String]): DataFrame
/**
* Aggregates on the entire [[DataFrame]] without groups.
@@ -325,31 +325,31 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* }}
*/
@scala.annotation.varargs
- override def agg(expr: Column, exprs: Column*): DataFrame
+ def agg(expr: Column, exprs: Column*): DataFrame
/**
* Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
* and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
*/
- override def limit(n: Int): DataFrame
+ def limit(n: Int): DataFrame
/**
* Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
*/
- override def unionAll(other: DataFrame): DataFrame
+ def unionAll(other: DataFrame): DataFrame
/**
* Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
* This is equivalent to `INTERSECT` in SQL.
*/
- override def intersect(other: DataFrame): DataFrame
+ def intersect(other: DataFrame): DataFrame
/**
* Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
* This is equivalent to `EXCEPT` in SQL.
*/
- override def except(other: DataFrame): DataFrame
+ def except(other: DataFrame): DataFrame
/**
* Returns a new [[DataFrame]] by sampling a fraction of rows.
@@ -358,7 +358,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* @param fraction Fraction of rows to generate.
* @param seed Seed for sampling.
*/
- override def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame
+ def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame
/**
* Returns a new [[DataFrame]] by sampling a fraction of rows, using a random seed.
@@ -366,24 +366,24 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* @param withReplacement Sample with replacement or not.
* @param fraction Fraction of rows to generate.
*/
- override def sample(withReplacement: Boolean, fraction: Double): DataFrame
+ def sample(withReplacement: Boolean, fraction: Double): DataFrame
/////////////////////////////////////////////////////////////////////////////
/**
* Returns a new [[DataFrame]] by adding a column.
*/
- override def addColumn(colName: String, col: Column): DataFrame
+ def addColumn(colName: String, col: Column): DataFrame
/**
* Returns the first `n` rows.
*/
- override def head(n: Int): Array[Row]
+ def head(n: Int): Array[Row]
/**
* Returns the first row.
*/
- override def head(): Row
+ def head(): Row
/**
* Returns the first row. Alias for head().
@@ -453,7 +453,17 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
/**
* Returns the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s.
*/
- override def rdd: RDD[Row]
+ def rdd: RDD[Row]
+
+ /**
+ * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+ */
+ def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD()
+
+ /**
+ * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+ */
+ def javaRDD: JavaRDD[Row] = toJavaRDD
/**
* Registers this RDD as a temporary table using the given name. The lifetime of this temporary
@@ -461,14 +471,14 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
*
* @group schema
*/
- override def registerTempTable(tableName: String): Unit
+ def registerTempTable(tableName: String): Unit
/**
* Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
* Files that are written out using this method can be read back in as a [[DataFrame]]
* using the `parquetFile` function in [[SQLContext]].
*/
- override def saveAsParquetFile(path: String): Unit
+ def saveAsParquetFile(path: String): Unit
/**
* :: Experimental ::
@@ -481,7 +491,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* be the target of an `insertInto`.
*/
@Experimental
- override def saveAsTable(tableName: String): Unit
+ def saveAsTable(tableName: String): Unit
/**
* :: Experimental ::
@@ -494,7 +504,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* be the target of an `insertInto`.
*/
@Experimental
- override def saveAsTable(
+ def saveAsTable(
tableName: String,
dataSourceName: String,
option: (String, String),
@@ -511,22 +521,22 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* be the target of an `insertInto`.
*/
@Experimental
- override def saveAsTable(
+ def saveAsTable(
tableName: String,
dataSourceName: String,
options: java.util.Map[String, String]): Unit
@Experimental
- override def save(path: String): Unit
+ def save(path: String): Unit
@Experimental
- override def save(
+ def save(
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit
@Experimental
- override def save(
+ def save(
dataSourceName: String,
options: java.util.Map[String, String]): Unit
@@ -535,12 +545,20 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
*/
@Experimental
- override def insertInto(tableName: String, overwrite: Boolean): Unit
+ def insertInto(tableName: String, overwrite: Boolean): Unit
+
+ /**
+ * :: Experimental ::
+ * Adds the rows from this RDD to the specified table.
+ * Throws an exception if the table already exists.
+ */
+ @Experimental
+ def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
/**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
*/
- override def toJSON: RDD[String]
+ def toJSON: RDD[String]
////////////////////////////////////////////////////////////////////////////
// for Python API
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index f84dbf32fa..49fd131534 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql
-import java.util.{List => JList}
-
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -42,13 +40,17 @@ import org.apache.spark.util.Utils
/**
- * See [[DataFrame]] for documentation.
+ * Internal implementation of [[DataFrame]]. Users of the API should use [[DataFrame]] directly.
*/
private[sql] class DataFrameImpl protected[sql](
override val sqlContext: SQLContext,
val queryExecution: SQLContext#QueryExecution)
extends DataFrame {
+ /**
+ * A constructor that automatically analyzes the logical plan. This reports error eagerly
+ * as the [[DataFrame]] is constructed.
+ */
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {
this(sqlContext, {
val qe = sqlContext.executePlan(logicalPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
index d3acd41bbf..6d0f3e8ce3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
@@ -30,8 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Aggregate
/**
* A set of methods for aggregations on a [[DataFrame]], created by [[DataFrame.groupBy]].
*/
-class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression])
- extends GroupedDataFrameApi {
+class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression]) {
private[this] implicit def toDataFrame(aggExprs: Seq[NamedExpression]): DataFrame = {
val namedGroupingExprs = groupingExprs.map {
@@ -72,7 +71,7 @@ class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expr
* ))
* }}}
*/
- override def agg(exprs: Map[String, String]): DataFrame = {
+ def agg(exprs: Map[String, String]): DataFrame = {
exprs.map { case (colName, expr) =>
val a = strToExpr(expr)(df(colName).expr)
Alias(a, a.toString)()
@@ -109,7 +108,7 @@ class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expr
* }}}
*/
@scala.annotation.varargs
- override def agg(expr: Column, exprs: Column*): DataFrame = {
+ def agg(expr: Column, exprs: Column*): DataFrame = {
val aggExprs = (expr +: exprs).map(_.expr).map {
case expr: NamedExpression => expr
case expr: Expression => Alias(expr, expr.toString)()
@@ -121,35 +120,35 @@ class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expr
* Count the number of rows for each group.
* The resulting [[DataFrame]] will also contain the grouping columns.
*/
- override def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")())
+ def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")())
/**
* Compute the average value for each numeric columns for each group. This is an alias for `avg`.
* The resulting [[DataFrame]] will also contain the grouping columns.
*/
- override def mean(): DataFrame = aggregateNumericColumns(Average)
+ def mean(): DataFrame = aggregateNumericColumns(Average)
/**
* Compute the max value for each numeric columns for each group.
* The resulting [[DataFrame]] will also contain the grouping columns.
*/
- override def max(): DataFrame = aggregateNumericColumns(Max)
+ def max(): DataFrame = aggregateNumericColumns(Max)
/**
* Compute the mean value for each numeric columns for each group.
* The resulting [[DataFrame]] will also contain the grouping columns.
*/
- override def avg(): DataFrame = aggregateNumericColumns(Average)
+ def avg(): DataFrame = aggregateNumericColumns(Average)
/**
* Compute the min value for each numeric column for each group.
* The resulting [[DataFrame]] will also contain the grouping columns.
*/
- override def min(): DataFrame = aggregateNumericColumns(Min)
+ def min(): DataFrame = aggregateNumericColumns(Min)
/**
* Compute the sum for each numeric columns for each group.
* The resulting [[DataFrame]] will also contain the grouping columns.
*/
- override def sum(): DataFrame = aggregateNumericColumns(Sum)
+ def sum(): DataFrame = aggregateNumericColumns(Sum)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
new file mode 100644
index 0000000000..38e6382f17
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+
+/**
+ * An internal interface defining the RDD-like methods for [[DataFrame]].
+ * Please use [[DataFrame]] directly, and do NOT use this.
+ */
+private[sql] trait RDDApi[T] {
+
+ def cache(): this.type = persist()
+
+ def persist(): this.type
+
+ def persist(newLevel: StorageLevel): this.type
+
+ def unpersist(): this.type = unpersist(blocking = false)
+
+ def unpersist(blocking: Boolean): this.type
+
+ def map[R: ClassTag](f: T => R): RDD[R]
+
+ def flatMap[R: ClassTag](f: T => TraversableOnce[R]): RDD[R]
+
+ def mapPartitions[R: ClassTag](f: Iterator[T] => Iterator[R]): RDD[R]
+
+ def foreach(f: T => Unit): Unit
+
+ def foreachPartition(f: Iterator[T] => Unit): Unit
+
+ def take(n: Int): Array[T]
+
+ def collect(): Array[T]
+
+ def collectAsList(): java.util.List[T]
+
+ def count(): Long
+
+ def first(): T
+
+ def repartition(numPartitions: Int): DataFrame
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
deleted file mode 100644
index c4a00cdb20..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.storage.StorageLevel
-
-
-/**
- * An internal interface defining the RDD-like methods for [[DataFrame]].
- * Please use [[DataFrame]] directly, and do NOT use this.
- */
-private[sql] trait RDDApi[T] {
-
- def cache(): this.type = persist()
-
- def persist(): this.type
-
- def persist(newLevel: StorageLevel): this.type
-
- def unpersist(): this.type = unpersist(blocking = false)
-
- def unpersist(blocking: Boolean): this.type
-
- def map[R: ClassTag](f: T => R): RDD[R]
-
- def flatMap[R: ClassTag](f: T => TraversableOnce[R]): RDD[R]
-
- def mapPartitions[R: ClassTag](f: Iterator[T] => Iterator[R]): RDD[R]
-
- def foreach(f: T => Unit): Unit
-
- def foreachPartition(f: Iterator[T] => Unit): Unit
-
- def take(n: Int): Array[T]
-
- def collect(): Array[T]
-
- def collectAsList(): java.util.List[T]
-
- def count(): Long
-
- def first(): T
-
- def repartition(numPartitions: Int): DataFrame
-}
-
-
-/**
- * An internal interface defining data frame related methods in [[DataFrame]].
- * Please use [[DataFrame]] directly, and do NOT use this.
- */
-private[sql] trait DataFrameSpecificApi {
-
- def schema: StructType
-
- def printSchema(): Unit
-
- def dtypes: Array[(String, String)]
-
- def columns: Array[String]
-
- def head(): Row
-
- def head(n: Int): Array[Row]
-
- /////////////////////////////////////////////////////////////////////////////
- // Relational operators
- /////////////////////////////////////////////////////////////////////////////
- def apply(colName: String): Column
-
- def apply(projection: Product): DataFrame
-
- @scala.annotation.varargs
- def select(cols: Column*): DataFrame
-
- @scala.annotation.varargs
- def select(col: String, cols: String*): DataFrame
-
- def apply(condition: Column): DataFrame
-
- def as(name: String): DataFrame
-
- def filter(condition: Column): DataFrame
-
- def where(condition: Column): DataFrame
-
- @scala.annotation.varargs
- def groupBy(cols: Column*): GroupedDataFrame
-
- @scala.annotation.varargs
- def groupBy(col1: String, cols: String*): GroupedDataFrame
-
- def agg(exprs: Map[String, String]): DataFrame
-
- def agg(exprs: java.util.Map[String, String]): DataFrame
-
- @scala.annotation.varargs
- def agg(expr: Column, exprs: Column*): DataFrame
-
- @scala.annotation.varargs
- def sort(sortExpr: Column, sortExprs: Column*): DataFrame
-
- @scala.annotation.varargs
- def sort(sortCol: String, sortCols: String*): DataFrame
-
- @scala.annotation.varargs
- def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame
-
- @scala.annotation.varargs
- def orderBy(sortCol: String, sortCols: String*): DataFrame
-
- def join(right: DataFrame): DataFrame
-
- def join(right: DataFrame, joinExprs: Column): DataFrame
-
- def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
-
- def limit(n: Int): DataFrame
-
- def unionAll(other: DataFrame): DataFrame
-
- def intersect(other: DataFrame): DataFrame
-
- def except(other: DataFrame): DataFrame
-
- def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame
-
- def sample(withReplacement: Boolean, fraction: Double): DataFrame
-
- /////////////////////////////////////////////////////////////////////////////
- // Column mutation
- /////////////////////////////////////////////////////////////////////////////
- def addColumn(colName: String, col: Column): DataFrame
-
- /////////////////////////////////////////////////////////////////////////////
- // I/O and interaction with other frameworks
- /////////////////////////////////////////////////////////////////////////////
-
- def rdd: RDD[Row]
-
- def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD()
-
- def toJSON: RDD[String]
-
- def registerTempTable(tableName: String): Unit
-
- def saveAsParquetFile(path: String): Unit
-
- @Experimental
- def saveAsTable(tableName: String): Unit
-
- @Experimental
- def saveAsTable(
- tableName: String,
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): Unit
-
- @Experimental
- def saveAsTable(
- tableName: String,
- dataSourceName: String,
- options: java.util.Map[String, String]): Unit
-
- @Experimental
- def save(path: String): Unit
-
- @Experimental
- def save(
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): Unit
-
- @Experimental
- def save(
- dataSourceName: String,
- options: java.util.Map[String, String]): Unit
-
- @Experimental
- def insertInto(tableName: String, overwrite: Boolean): Unit
-
- @Experimental
- def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
-
- /////////////////////////////////////////////////////////////////////////////
- // Stat functions
- /////////////////////////////////////////////////////////////////////////////
-// def describe(): Unit
-//
-// def mean(): Unit
-//
-// def max(): Unit
-//
-// def min(): Unit
-}
-
-
-/**
- * An internal interface defining expression APIs for [[DataFrame]].
- * Please use [[DataFrame]] and [[Column]] directly, and do NOT use this.
- */
-private[sql] trait ExpressionApi {
-
- def isComputable: Boolean
-
- def unary_- : Column
- def unary_! : Column
- def unary_~ : Column
-
- def + (other: Column): Column
- def + (other: Any): Column
- def - (other: Column): Column
- def - (other: Any): Column
- def * (other: Column): Column
- def * (other: Any): Column
- def / (other: Column): Column
- def / (other: Any): Column
- def % (other: Column): Column
- def % (other: Any): Column
- def & (other: Column): Column
- def & (other: Any): Column
- def | (other: Column): Column
- def | (other: Any): Column
- def ^ (other: Column): Column
- def ^ (other: Any): Column
-
- def && (other: Column): Column
- def && (other: Boolean): Column
- def || (other: Column): Column
- def || (other: Boolean): Column
-
- def < (other: Column): Column
- def < (other: Any): Column
- def <= (other: Column): Column
- def <= (other: Any): Column
- def > (other: Column): Column
- def > (other: Any): Column
- def >= (other: Column): Column
- def >= (other: Any): Column
- def === (other: Column): Column
- def === (other: Any): Column
- def equalTo(other: Column): Column
- def equalTo(other: Any): Column
- def <=> (other: Column): Column
- def <=> (other: Any): Column
- def !== (other: Column): Column
- def !== (other: Any): Column
-
- @scala.annotation.varargs
- def in(list: Column*): Column
-
- def like(other: String): Column
- def rlike(other: String): Column
-
- def contains(other: Column): Column
- def contains(other: Any): Column
- def startsWith(other: Column): Column
- def startsWith(other: String): Column
- def endsWith(other: Column): Column
- def endsWith(other: String): Column
-
- def substr(startPos: Column, len: Column): Column
- def substr(startPos: Int, len: Int): Column
-
- def isNull: Column
- def isNotNull: Column
-
- def getItem(ordinal: Int): Column
- def getField(fieldName: String): Column
-
- def cast(to: DataType): Column
- def cast(to: String): Column
-
- def asc: Column
- def desc: Column
-
- def as(alias: String): Column
-}
-
-
-/**
- * An internal interface defining aggregation APIs for [[DataFrame]].
- * Please use [[DataFrame]] and [[GroupedDataFrame]] directly, and do NOT use this.
- */
-private[sql] trait GroupedDataFrameApi {
-
- def agg(exprs: Map[String, String]): DataFrame
-
- @scala.annotation.varargs
- def agg(expr: Column, exprs: Column*): DataFrame
-
- def avg(): DataFrame
-
- def mean(): DataFrame
-
- def min(): DataFrame
-
- def max(): DataFrame
-
- def sum(): DataFrame
-
- def count(): DataFrame
-
- // TODO: Add var, std
-}