diff options
author | Reynold Xin <rxin@databricks.com> | 2015-02-03 00:29:04 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-02-03 00:29:04 -0800 |
commit | 37df330135e6a3e62c580e5706eade5f1eaf5b13 (patch) | |
tree | f7ddd7cd55b0e621ea6facdfdbf06a11b5cc955b /sql | |
parent | 659329f9ee51ca8ae6232e07c45b5d9144d49667 (diff) | |
download | spark-37df330135e6a3e62c580e5706eade5f1eaf5b13.tar.gz spark-37df330135e6a3e62c580e5706eade5f1eaf5b13.tar.bz2 spark-37df330135e6a3e62c580e5706eade5f1eaf5b13.zip |
[SQL][DataFrame] Remove DataFrameApi, ExpressionApi, and GroupedDataFrameApi
They were there mostly for code review and easier check of the API. I don't think they need to be there anymore.
Author: Reynold Xin <rxin@databricks.com>
Closes #4328 from rxin/remove-df-api and squashes the following commits:
723d600 [Reynold Xin] [SQL][DataFrame] Remove DataFrameApi and ColumnApi.
Diffstat (limited to 'sql')
6 files changed, 200 insertions, 444 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 6f48d7c3fe..0d6055ff23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -56,7 +56,7 @@ private[sql] object Column { * */ // TODO: Improve documentation. -trait Column extends DataFrame with ExpressionApi { +trait Column extends DataFrame { protected[sql] def expr: Expression @@ -101,7 +101,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( -df("amount") ) * }}} */ - override def unary_- : Column = constructColumn(null) { UnaryMinus(expr) } + def unary_- : Column = constructColumn(null) { UnaryMinus(expr) } /** * Bitwise NOT. @@ -110,7 +110,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( ~df("flags") ) * }}} */ - override def unary_~ : Column = constructColumn(null) { BitwiseNot(expr) } + def unary_~ : Column = constructColumn(null) { BitwiseNot(expr) } /** * Inversion of boolean expression, i.e. NOT. @@ -119,7 +119,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( !df("isActive") ) * }} */ - override def unary_! : Column = constructColumn(null) { Not(expr) } + def unary_! : Column = constructColumn(null) { Not(expr) } /** @@ -130,7 +130,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( df("colA".equalTo(df("colB")) ) * }}} */ - override def === (other: Column): Column = constructColumn(other) { + def === (other: Column): Column = constructColumn(other) { EqualTo(expr, other.expr) } @@ -142,7 +142,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( df("colA".equalTo("Zaharia") ) * }}} */ - override def === (literal: Any): Column = this === lit(literal) + def === (literal: Any): Column = this === lit(literal) /** * Equality test with an expression. @@ -152,7 +152,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( df("colA".equalTo(df("colB")) ) * }}} */ - override def equalTo(other: Column): Column = this === other + def equalTo(other: Column): Column = this === other /** * Equality test with a literal value. @@ -162,7 +162,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( df("colA".equalTo("Zaharia") ) * }}} */ - override def equalTo(literal: Any): Column = this === literal + def equalTo(literal: Any): Column = this === literal /** * Inequality test with an expression. @@ -172,7 +172,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( !(df("colA") === df("colB")) ) * }}} */ - override def !== (other: Column): Column = constructColumn(other) { + def !== (other: Column): Column = constructColumn(other) { Not(EqualTo(expr, other.expr)) } @@ -184,7 +184,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select( !(df("colA") === 15) ) * }}} */ - override def !== (literal: Any): Column = this !== lit(literal) + def !== (literal: Any): Column = this !== lit(literal) /** * Greater than an expression. @@ -193,7 +193,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("age") > Literal(21) ) * }}} */ - override def > (other: Column): Column = constructColumn(other) { + def > (other: Column): Column = constructColumn(other) { GreaterThan(expr, other.expr) } @@ -204,7 +204,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("age") > 21 ) * }}} */ - override def > (literal: Any): Column = this > lit(literal) + def > (literal: Any): Column = this > lit(literal) /** * Less than an expression. @@ -213,7 +213,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("age") < Literal(21) ) * }}} */ - override def < (other: Column): Column = constructColumn(other) { + def < (other: Column): Column = constructColumn(other) { LessThan(expr, other.expr) } @@ -224,7 +224,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("age") < 21 ) * }}} */ - override def < (literal: Any): Column = this < lit(literal) + def < (literal: Any): Column = this < lit(literal) /** * Less than or equal to an expression. @@ -233,7 +233,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("age") <= Literal(21) ) * }}} */ - override def <= (other: Column): Column = constructColumn(other) { + def <= (other: Column): Column = constructColumn(other) { LessThanOrEqual(expr, other.expr) } @@ -244,7 +244,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("age") <= 21 ) * }}} */ - override def <= (literal: Any): Column = this <= lit(literal) + def <= (literal: Any): Column = this <= lit(literal) /** * Greater than or equal to an expression. @@ -253,7 +253,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("age") >= Literal(21) ) * }}} */ - override def >= (other: Column): Column = constructColumn(other) { + def >= (other: Column): Column = constructColumn(other) { GreaterThanOrEqual(expr, other.expr) } @@ -264,12 +264,12 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("age") >= 21 ) * }}} */ - override def >= (literal: Any): Column = this >= lit(literal) + def >= (literal: Any): Column = this >= lit(literal) /** * Equality test with an expression that is safe for null values. */ - override def <=> (other: Column): Column = constructColumn(other) { + def <=> (other: Column): Column = constructColumn(other) { other match { case null => EqualNullSafe(expr, lit(null).expr) case _ => EqualNullSafe(expr, other.expr) @@ -279,17 +279,17 @@ trait Column extends DataFrame with ExpressionApi { /** * Equality test with a literal value that is safe for null values. */ - override def <=> (literal: Any): Column = this <=> lit(literal) + def <=> (literal: Any): Column = this <=> lit(literal) /** * True if the current expression is null. */ - override def isNull: Column = constructColumn(null) { IsNull(expr) } + def isNull: Column = constructColumn(null) { IsNull(expr) } /** * True if the current expression is NOT null. */ - override def isNotNull: Column = constructColumn(null) { IsNotNull(expr) } + def isNotNull: Column = constructColumn(null) { IsNotNull(expr) } /** * Boolean OR with an expression. @@ -298,7 +298,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("inSchool") || people("isEmployed") ) * }}} */ - override def || (other: Column): Column = constructColumn(other) { + def || (other: Column): Column = constructColumn(other) { Or(expr, other.expr) } @@ -309,7 +309,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("inSchool") || true ) * }}} */ - override def || (literal: Boolean): Column = this || lit(literal) + def || (literal: Boolean): Column = this || lit(literal) /** * Boolean AND with an expression. @@ -318,7 +318,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("inSchool") && people("isEmployed") ) * }}} */ - override def && (other: Column): Column = constructColumn(other) { + def && (other: Column): Column = constructColumn(other) { And(expr, other.expr) } @@ -329,43 +329,43 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("inSchool") && true ) * }}} */ - override def && (literal: Boolean): Column = this && lit(literal) + def && (literal: Boolean): Column = this && lit(literal) /** * Bitwise AND with an expression. */ - override def & (other: Column): Column = constructColumn(other) { + def & (other: Column): Column = constructColumn(other) { BitwiseAnd(expr, other.expr) } /** * Bitwise AND with a literal value. */ - override def & (literal: Any): Column = this & lit(literal) + def & (literal: Any): Column = this & lit(literal) /** * Bitwise OR with an expression. */ - override def | (other: Column): Column = constructColumn(other) { + def | (other: Column): Column = constructColumn(other) { BitwiseOr(expr, other.expr) } /** * Bitwise OR with a literal value. */ - override def | (literal: Any): Column = this | lit(literal) + def | (literal: Any): Column = this | lit(literal) /** * Bitwise XOR with an expression. */ - override def ^ (other: Column): Column = constructColumn(other) { + def ^ (other: Column): Column = constructColumn(other) { BitwiseXor(expr, other.expr) } /** * Bitwise XOR with a literal value. */ - override def ^ (literal: Any): Column = this ^ lit(literal) + def ^ (literal: Any): Column = this ^ lit(literal) /** * Sum of this expression and another expression. @@ -374,7 +374,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("height") + people("weight") ) * }}} */ - override def + (other: Column): Column = constructColumn(other) { + def + (other: Column): Column = constructColumn(other) { Add(expr, other.expr) } @@ -385,7 +385,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("height") + 10 ) * }}} */ - override def + (literal: Any): Column = this + lit(literal) + def + (literal: Any): Column = this + lit(literal) /** * Subtraction. Subtract the other expression from this expression. @@ -394,7 +394,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("height") - people("weight") ) * }}} */ - override def - (other: Column): Column = constructColumn(other) { + def - (other: Column): Column = constructColumn(other) { Subtract(expr, other.expr) } @@ -405,7 +405,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("height") - 10 ) * }}} */ - override def - (literal: Any): Column = this - lit(literal) + def - (literal: Any): Column = this - lit(literal) /** * Multiplication of this expression and another expression. @@ -414,7 +414,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("height") * people("weight") ) * }}} */ - override def * (other: Column): Column = constructColumn(other) { + def * (other: Column): Column = constructColumn(other) { Multiply(expr, other.expr) } @@ -425,7 +425,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("height") * 10 ) * }}} */ - override def * (literal: Any): Column = this * lit(literal) + def * (literal: Any): Column = this * lit(literal) /** * Division this expression by another expression. @@ -434,7 +434,7 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("height") / people("weight") ) * }}} */ - override def / (other: Column): Column = constructColumn(other) { + def / (other: Column): Column = constructColumn(other) { Divide(expr, other.expr) } @@ -445,19 +445,19 @@ trait Column extends DataFrame with ExpressionApi { * people.select( people("height") / 10 ) * }}} */ - override def / (literal: Any): Column = this / lit(literal) + def / (literal: Any): Column = this / lit(literal) /** * Modulo (a.k.a. remainder) expression. */ - override def % (other: Column): Column = constructColumn(other) { + def % (other: Column): Column = constructColumn(other) { Remainder(expr, other.expr) } /** * Modulo (a.k.a. remainder) expression. */ - override def % (literal: Any): Column = this % lit(literal) + def % (literal: Any): Column = this % lit(literal) /** @@ -465,29 +465,29 @@ trait Column extends DataFrame with ExpressionApi { * by the evaluated values of the arguments. */ @scala.annotation.varargs - override def in(list: Column*): Column = { + def in(list: Column*): Column = { new IncomputableColumn(In(expr, list.map(_.expr))) } - override def like(literal: String): Column = constructColumn(null) { + def like(literal: String): Column = constructColumn(null) { Like(expr, lit(literal).expr) } - override def rlike(literal: String): Column = constructColumn(null) { + def rlike(literal: String): Column = constructColumn(null) { RLike(expr, lit(literal).expr) } /** * An expression that gets an item at position `ordinal` out of an array. */ - override def getItem(ordinal: Int): Column = constructColumn(null) { + def getItem(ordinal: Int): Column = constructColumn(null) { GetItem(expr, Literal(ordinal)) } /** * An expression that gets a field by name in a [[StructField]]. */ - override def getField(fieldName: String): Column = constructColumn(null) { + def getField(fieldName: String): Column = constructColumn(null) { GetField(expr, fieldName) } @@ -496,7 +496,7 @@ trait Column extends DataFrame with ExpressionApi { * @param startPos expression for the starting position. * @param len expression for the length of the substring. */ - override def substr(startPos: Column, len: Column): Column = { + def substr(startPos: Column, len: Column): Column = { new IncomputableColumn(Substring(expr, startPos.expr, len.expr)) } @@ -505,25 +505,25 @@ trait Column extends DataFrame with ExpressionApi { * @param startPos starting position. * @param len length of the substring. */ - override def substr(startPos: Int, len: Int): Column = this.substr(lit(startPos), lit(len)) + def substr(startPos: Int, len: Int): Column = this.substr(lit(startPos), lit(len)) - override def contains(other: Column): Column = constructColumn(other) { + def contains(other: Column): Column = constructColumn(other) { Contains(expr, other.expr) } - override def contains(literal: Any): Column = this.contains(lit(literal)) + def contains(literal: Any): Column = this.contains(lit(literal)) - override def startsWith(other: Column): Column = constructColumn(other) { + def startsWith(other: Column): Column = constructColumn(other) { StartsWith(expr, other.expr) } - override def startsWith(literal: String): Column = this.startsWith(lit(literal)) + def startsWith(literal: String): Column = this.startsWith(lit(literal)) - override def endsWith(other: Column): Column = constructColumn(other) { + def endsWith(other: Column): Column = constructColumn(other) { EndsWith(expr, other.expr) } - override def endsWith(literal: String): Column = this.endsWith(lit(literal)) + def endsWith(literal: String): Column = this.endsWith(lit(literal)) /** * Gives the column an alias. @@ -545,7 +545,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select(df("colA").cast("int")) * }}} */ - override def cast(to: DataType): Column = constructColumn(null) { Cast(expr, to) } + def cast(to: DataType): Column = constructColumn(null) { Cast(expr, to) } /** * Casts the column to a different data type, using the canonical string representation @@ -556,7 +556,7 @@ trait Column extends DataFrame with ExpressionApi { * df.select(df("colA").cast("int")) * }}} */ - override def cast(to: String): Column = constructColumn(null) { + def cast(to: String): Column = constructColumn(null) { Cast(expr, to.toLowerCase match { case "string" => StringType case "boolean" => BooleanType @@ -573,9 +573,9 @@ trait Column extends DataFrame with ExpressionApi { }) } - override def desc: Column = constructColumn(null) { SortOrder(expr, Descending) } + def desc: Column = constructColumn(null) { SortOrder(expr, Descending) } - override def asc: Column = constructColumn(null) { SortOrder(expr, Ascending) } + def asc: Column = constructColumn(null) { SortOrder(expr, Ascending) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 4cbfb6af5d..5920852e8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -71,7 +71,7 @@ private[sql] object DataFrame { * }}} */ // TODO: Improve documentation. -trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { +trait DataFrame extends RDDApi[Row] { val sqlContext: SQLContext @@ -80,7 +80,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { protected[sql] def logicalPlan: LogicalPlan - /** Left here for compatibility reasons. */ + /** Left here for backward compatibility. */ @deprecated("1.3.0", "use toDataFrame") def toSchemaRDD: DataFrame = this @@ -102,16 +102,16 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { def toDataFrame(colName: String, colNames: String*): DataFrame /** Returns the schema of this [[DataFrame]]. */ - override def schema: StructType + def schema: StructType /** Returns all column names and their data types as an array. */ - override def dtypes: Array[(String, String)] + def dtypes: Array[(String, String)] /** Returns all column names as an array. */ - override def columns: Array[String] = schema.fields.map(_.name) + def columns: Array[String] = schema.fields.map(_.name) /** Prints the schema to the console in a nice tree format. */ - override def printSchema(): Unit + def printSchema(): Unit /** * Cartesian join with another [[DataFrame]]. @@ -120,7 +120,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * * @param right Right side of the join operation. */ - override def join(right: DataFrame): DataFrame + def join(right: DataFrame): DataFrame /** * Inner join with another [[DataFrame]], using the given join expression. @@ -131,7 +131,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * df1.join(df2).where($"df1Key" === $"df2Key") * }}} */ - override def join(right: DataFrame, joinExprs: Column): DataFrame + def join(right: DataFrame, joinExprs: Column): DataFrame /** * Join with another [[DataFrame]], usin g the given join expression. The following performs @@ -145,7 +145,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * @param joinExprs Join expression. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`. */ - override def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame + def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame /** * Returns a new [[DataFrame]] sorted by the specified column, all in ascending order. @@ -157,7 +157,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * }}} */ @scala.annotation.varargs - override def sort(sortCol: String, sortCols: String*): DataFrame + def sort(sortCol: String, sortCols: String*): DataFrame /** * Returns a new [[DataFrame]] sorted by the given expressions. For example: @@ -166,26 +166,26 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * }}} */ @scala.annotation.varargs - override def sort(sortExpr: Column, sortExprs: Column*): DataFrame + def sort(sortExpr: Column, sortExprs: Column*): DataFrame /** * Returns a new [[DataFrame]] sorted by the given expressions. * This is an alias of the `sort` function. */ @scala.annotation.varargs - override def orderBy(sortCol: String, sortCols: String*): DataFrame + def orderBy(sortCol: String, sortCols: String*): DataFrame /** * Returns a new [[DataFrame]] sorted by the given expressions. * This is an alias of the `sort` function. */ @scala.annotation.varargs - override def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame + def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame /** * Selects column based on the column name and return it as a [[Column]]. */ - override def apply(colName: String): Column + def apply(colName: String): Column /** * Selects a set of expressions, wrapped in a Product. @@ -195,12 +195,12 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * df.select($"colA", $"colB" + 1) * }}} */ - override def apply(projection: Product): DataFrame + def apply(projection: Product): DataFrame /** * Returns a new [[DataFrame]] with an alias set. */ - override def as(name: String): DataFrame + def as(name: String): DataFrame /** * Selects a set of expressions. @@ -209,7 +209,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * }}} */ @scala.annotation.varargs - override def select(cols: Column*): DataFrame + def select(cols: Column*): DataFrame /** * Selects a set of columns. This is a variant of `select` that can only select @@ -222,7 +222,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * }}} */ @scala.annotation.varargs - override def select(col: String, cols: String*): DataFrame + def select(col: String, cols: String*): DataFrame /** * Filters rows using the given condition. @@ -233,7 +233,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * peopleDf($"age" > 15) * }}} */ - override def filter(condition: Column): DataFrame + def filter(condition: Column): DataFrame /** * Filters rows using the given condition. This is an alias for `filter`. @@ -244,7 +244,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * peopleDf($"age" > 15) * }}} */ - override def where(condition: Column): DataFrame + def where(condition: Column): DataFrame /** * Filters rows using the given condition. This is a shorthand meant for Scala. @@ -255,7 +255,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * peopleDf($"age" > 15) * }}} */ - override def apply(condition: Column): DataFrame + def apply(condition: Column): DataFrame /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. @@ -273,7 +273,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * }}} */ @scala.annotation.varargs - override def groupBy(cols: Column*): GroupedDataFrame + def groupBy(cols: Column*): GroupedDataFrame /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. @@ -294,7 +294,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * }}} */ @scala.annotation.varargs - override def groupBy(col1: String, cols: String*): GroupedDataFrame + def groupBy(col1: String, cols: String*): GroupedDataFrame /** * Aggregates on the entire [[DataFrame]] without groups. @@ -304,7 +304,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) * }} */ - override def agg(exprs: Map[String, String]): DataFrame + def agg(exprs: Map[String, String]): DataFrame /** * Aggregates on the entire [[DataFrame]] without groups. @@ -314,7 +314,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) * }} */ - override def agg(exprs: java.util.Map[String, String]): DataFrame + def agg(exprs: java.util.Map[String, String]): DataFrame /** * Aggregates on the entire [[DataFrame]] without groups. @@ -325,31 +325,31 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * }} */ @scala.annotation.varargs - override def agg(expr: Column, exprs: Column*): DataFrame + def agg(expr: Column, exprs: Column*): DataFrame /** * Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]]. */ - override def limit(n: Int): DataFrame + def limit(n: Int): DataFrame /** * Returns a new [[DataFrame]] containing union of rows in this frame and another frame. * This is equivalent to `UNION ALL` in SQL. */ - override def unionAll(other: DataFrame): DataFrame + def unionAll(other: DataFrame): DataFrame /** * Returns a new [[DataFrame]] containing rows only in both this frame and another frame. * This is equivalent to `INTERSECT` in SQL. */ - override def intersect(other: DataFrame): DataFrame + def intersect(other: DataFrame): DataFrame /** * Returns a new [[DataFrame]] containing rows in this frame but not in another frame. * This is equivalent to `EXCEPT` in SQL. */ - override def except(other: DataFrame): DataFrame + def except(other: DataFrame): DataFrame /** * Returns a new [[DataFrame]] by sampling a fraction of rows. @@ -358,7 +358,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * @param fraction Fraction of rows to generate. * @param seed Seed for sampling. */ - override def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame + def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame /** * Returns a new [[DataFrame]] by sampling a fraction of rows, using a random seed. @@ -366,24 +366,24 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate. */ - override def sample(withReplacement: Boolean, fraction: Double): DataFrame + def sample(withReplacement: Boolean, fraction: Double): DataFrame ///////////////////////////////////////////////////////////////////////////// /** * Returns a new [[DataFrame]] by adding a column. */ - override def addColumn(colName: String, col: Column): DataFrame + def addColumn(colName: String, col: Column): DataFrame /** * Returns the first `n` rows. */ - override def head(n: Int): Array[Row] + def head(n: Int): Array[Row] /** * Returns the first row. */ - override def head(): Row + def head(): Row /** * Returns the first row. Alias for head(). @@ -453,7 +453,17 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { /** * Returns the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s. */ - override def rdd: RDD[Row] + def rdd: RDD[Row] + + /** + * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s. + */ + def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD() + + /** + * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s. + */ + def javaRDD: JavaRDD[Row] = toJavaRDD /** * Registers this RDD as a temporary table using the given name. The lifetime of this temporary @@ -461,14 +471,14 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * * @group schema */ - override def registerTempTable(tableName: String): Unit + def registerTempTable(tableName: String): Unit /** * Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema. * Files that are written out using this method can be read back in as a [[DataFrame]] * using the `parquetFile` function in [[SQLContext]]. */ - override def saveAsParquetFile(path: String): Unit + def saveAsParquetFile(path: String): Unit /** * :: Experimental :: @@ -481,7 +491,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * be the target of an `insertInto`. */ @Experimental - override def saveAsTable(tableName: String): Unit + def saveAsTable(tableName: String): Unit /** * :: Experimental :: @@ -494,7 +504,7 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * be the target of an `insertInto`. */ @Experimental - override def saveAsTable( + def saveAsTable( tableName: String, dataSourceName: String, option: (String, String), @@ -511,22 +521,22 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * be the target of an `insertInto`. */ @Experimental - override def saveAsTable( + def saveAsTable( tableName: String, dataSourceName: String, options: java.util.Map[String, String]): Unit @Experimental - override def save(path: String): Unit + def save(path: String): Unit @Experimental - override def save( + def save( dataSourceName: String, option: (String, String), options: (String, String)*): Unit @Experimental - override def save( + def save( dataSourceName: String, options: java.util.Map[String, String]): Unit @@ -535,12 +545,20 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. */ @Experimental - override def insertInto(tableName: String, overwrite: Boolean): Unit + def insertInto(tableName: String, overwrite: Boolean): Unit + + /** + * :: Experimental :: + * Adds the rows from this RDD to the specified table. + * Throws an exception if the table already exists. + */ + @Experimental + def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false) /** * Returns the content of the [[DataFrame]] as a RDD of JSON strings. */ - override def toJSON: RDD[String] + def toJSON: RDD[String] //////////////////////////////////////////////////////////////////////////// // for Python API diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index f84dbf32fa..49fd131534 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import java.util.{List => JList} - import scala.language.implicitConversions import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -42,13 +40,17 @@ import org.apache.spark.util.Utils /** - * See [[DataFrame]] for documentation. + * Internal implementation of [[DataFrame]]. Users of the API should use [[DataFrame]] directly. */ private[sql] class DataFrameImpl protected[sql]( override val sqlContext: SQLContext, val queryExecution: SQLContext#QueryExecution) extends DataFrame { + /** + * A constructor that automatically analyzes the logical plan. This reports error eagerly + * as the [[DataFrame]] is constructed. + */ def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = { this(sqlContext, { val qe = sqlContext.executePlan(logicalPlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala index d3acd41bbf..6d0f3e8ce3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala @@ -30,8 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Aggregate /** * A set of methods for aggregations on a [[DataFrame]], created by [[DataFrame.groupBy]]. */ -class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression]) - extends GroupedDataFrameApi { +class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression]) { private[this] implicit def toDataFrame(aggExprs: Seq[NamedExpression]): DataFrame = { val namedGroupingExprs = groupingExprs.map { @@ -72,7 +71,7 @@ class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expr * )) * }}} */ - override def agg(exprs: Map[String, String]): DataFrame = { + def agg(exprs: Map[String, String]): DataFrame = { exprs.map { case (colName, expr) => val a = strToExpr(expr)(df(colName).expr) Alias(a, a.toString)() @@ -109,7 +108,7 @@ class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expr * }}} */ @scala.annotation.varargs - override def agg(expr: Column, exprs: Column*): DataFrame = { + def agg(expr: Column, exprs: Column*): DataFrame = { val aggExprs = (expr +: exprs).map(_.expr).map { case expr: NamedExpression => expr case expr: Expression => Alias(expr, expr.toString)() @@ -121,35 +120,35 @@ class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expr * Count the number of rows for each group. * The resulting [[DataFrame]] will also contain the grouping columns. */ - override def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")()) + def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")()) /** * Compute the average value for each numeric columns for each group. This is an alias for `avg`. * The resulting [[DataFrame]] will also contain the grouping columns. */ - override def mean(): DataFrame = aggregateNumericColumns(Average) + def mean(): DataFrame = aggregateNumericColumns(Average) /** * Compute the max value for each numeric columns for each group. * The resulting [[DataFrame]] will also contain the grouping columns. */ - override def max(): DataFrame = aggregateNumericColumns(Max) + def max(): DataFrame = aggregateNumericColumns(Max) /** * Compute the mean value for each numeric columns for each group. * The resulting [[DataFrame]] will also contain the grouping columns. */ - override def avg(): DataFrame = aggregateNumericColumns(Average) + def avg(): DataFrame = aggregateNumericColumns(Average) /** * Compute the min value for each numeric column for each group. * The resulting [[DataFrame]] will also contain the grouping columns. */ - override def min(): DataFrame = aggregateNumericColumns(Min) + def min(): DataFrame = aggregateNumericColumns(Min) /** * Compute the sum for each numeric columns for each group. * The resulting [[DataFrame]] will also contain the grouping columns. */ - override def sum(): DataFrame = aggregateNumericColumns(Sum) + def sum(): DataFrame = aggregateNumericColumns(Sum) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala new file mode 100644 index 0000000000..38e6382f17 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala @@ -0,0 +1,63 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import scala.reflect.ClassTag + +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + + +/** + * An internal interface defining the RDD-like methods for [[DataFrame]]. + * Please use [[DataFrame]] directly, and do NOT use this. + */ +private[sql] trait RDDApi[T] { + + def cache(): this.type = persist() + + def persist(): this.type + + def persist(newLevel: StorageLevel): this.type + + def unpersist(): this.type = unpersist(blocking = false) + + def unpersist(blocking: Boolean): this.type + + def map[R: ClassTag](f: T => R): RDD[R] + + def flatMap[R: ClassTag](f: T => TraversableOnce[R]): RDD[R] + + def mapPartitions[R: ClassTag](f: Iterator[T] => Iterator[R]): RDD[R] + + def foreach(f: T => Unit): Unit + + def foreachPartition(f: Iterator[T] => Unit): Unit + + def take(n: Int): Array[T] + + def collect(): Array[T] + + def collectAsList(): java.util.List[T] + + def count(): Long + + def first(): T + + def repartition(numPartitions: Int): DataFrame +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala deleted file mode 100644 index c4a00cdb20..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ /dev/null @@ -1,326 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql - -import scala.reflect.ClassTag - -import org.apache.spark.annotation.Experimental -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.storage.StorageLevel - - -/** - * An internal interface defining the RDD-like methods for [[DataFrame]]. - * Please use [[DataFrame]] directly, and do NOT use this. - */ -private[sql] trait RDDApi[T] { - - def cache(): this.type = persist() - - def persist(): this.type - - def persist(newLevel: StorageLevel): this.type - - def unpersist(): this.type = unpersist(blocking = false) - - def unpersist(blocking: Boolean): this.type - - def map[R: ClassTag](f: T => R): RDD[R] - - def flatMap[R: ClassTag](f: T => TraversableOnce[R]): RDD[R] - - def mapPartitions[R: ClassTag](f: Iterator[T] => Iterator[R]): RDD[R] - - def foreach(f: T => Unit): Unit - - def foreachPartition(f: Iterator[T] => Unit): Unit - - def take(n: Int): Array[T] - - def collect(): Array[T] - - def collectAsList(): java.util.List[T] - - def count(): Long - - def first(): T - - def repartition(numPartitions: Int): DataFrame -} - - -/** - * An internal interface defining data frame related methods in [[DataFrame]]. - * Please use [[DataFrame]] directly, and do NOT use this. - */ -private[sql] trait DataFrameSpecificApi { - - def schema: StructType - - def printSchema(): Unit - - def dtypes: Array[(String, String)] - - def columns: Array[String] - - def head(): Row - - def head(n: Int): Array[Row] - - ///////////////////////////////////////////////////////////////////////////// - // Relational operators - ///////////////////////////////////////////////////////////////////////////// - def apply(colName: String): Column - - def apply(projection: Product): DataFrame - - @scala.annotation.varargs - def select(cols: Column*): DataFrame - - @scala.annotation.varargs - def select(col: String, cols: String*): DataFrame - - def apply(condition: Column): DataFrame - - def as(name: String): DataFrame - - def filter(condition: Column): DataFrame - - def where(condition: Column): DataFrame - - @scala.annotation.varargs - def groupBy(cols: Column*): GroupedDataFrame - - @scala.annotation.varargs - def groupBy(col1: String, cols: String*): GroupedDataFrame - - def agg(exprs: Map[String, String]): DataFrame - - def agg(exprs: java.util.Map[String, String]): DataFrame - - @scala.annotation.varargs - def agg(expr: Column, exprs: Column*): DataFrame - - @scala.annotation.varargs - def sort(sortExpr: Column, sortExprs: Column*): DataFrame - - @scala.annotation.varargs - def sort(sortCol: String, sortCols: String*): DataFrame - - @scala.annotation.varargs - def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame - - @scala.annotation.varargs - def orderBy(sortCol: String, sortCols: String*): DataFrame - - def join(right: DataFrame): DataFrame - - def join(right: DataFrame, joinExprs: Column): DataFrame - - def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame - - def limit(n: Int): DataFrame - - def unionAll(other: DataFrame): DataFrame - - def intersect(other: DataFrame): DataFrame - - def except(other: DataFrame): DataFrame - - def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame - - def sample(withReplacement: Boolean, fraction: Double): DataFrame - - ///////////////////////////////////////////////////////////////////////////// - // Column mutation - ///////////////////////////////////////////////////////////////////////////// - def addColumn(colName: String, col: Column): DataFrame - - ///////////////////////////////////////////////////////////////////////////// - // I/O and interaction with other frameworks - ///////////////////////////////////////////////////////////////////////////// - - def rdd: RDD[Row] - - def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD() - - def toJSON: RDD[String] - - def registerTempTable(tableName: String): Unit - - def saveAsParquetFile(path: String): Unit - - @Experimental - def saveAsTable(tableName: String): Unit - - @Experimental - def saveAsTable( - tableName: String, - dataSourceName: String, - option: (String, String), - options: (String, String)*): Unit - - @Experimental - def saveAsTable( - tableName: String, - dataSourceName: String, - options: java.util.Map[String, String]): Unit - - @Experimental - def save(path: String): Unit - - @Experimental - def save( - dataSourceName: String, - option: (String, String), - options: (String, String)*): Unit - - @Experimental - def save( - dataSourceName: String, - options: java.util.Map[String, String]): Unit - - @Experimental - def insertInto(tableName: String, overwrite: Boolean): Unit - - @Experimental - def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false) - - ///////////////////////////////////////////////////////////////////////////// - // Stat functions - ///////////////////////////////////////////////////////////////////////////// -// def describe(): Unit -// -// def mean(): Unit -// -// def max(): Unit -// -// def min(): Unit -} - - -/** - * An internal interface defining expression APIs for [[DataFrame]]. - * Please use [[DataFrame]] and [[Column]] directly, and do NOT use this. - */ -private[sql] trait ExpressionApi { - - def isComputable: Boolean - - def unary_- : Column - def unary_! : Column - def unary_~ : Column - - def + (other: Column): Column - def + (other: Any): Column - def - (other: Column): Column - def - (other: Any): Column - def * (other: Column): Column - def * (other: Any): Column - def / (other: Column): Column - def / (other: Any): Column - def % (other: Column): Column - def % (other: Any): Column - def & (other: Column): Column - def & (other: Any): Column - def | (other: Column): Column - def | (other: Any): Column - def ^ (other: Column): Column - def ^ (other: Any): Column - - def && (other: Column): Column - def && (other: Boolean): Column - def || (other: Column): Column - def || (other: Boolean): Column - - def < (other: Column): Column - def < (other: Any): Column - def <= (other: Column): Column - def <= (other: Any): Column - def > (other: Column): Column - def > (other: Any): Column - def >= (other: Column): Column - def >= (other: Any): Column - def === (other: Column): Column - def === (other: Any): Column - def equalTo(other: Column): Column - def equalTo(other: Any): Column - def <=> (other: Column): Column - def <=> (other: Any): Column - def !== (other: Column): Column - def !== (other: Any): Column - - @scala.annotation.varargs - def in(list: Column*): Column - - def like(other: String): Column - def rlike(other: String): Column - - def contains(other: Column): Column - def contains(other: Any): Column - def startsWith(other: Column): Column - def startsWith(other: String): Column - def endsWith(other: Column): Column - def endsWith(other: String): Column - - def substr(startPos: Column, len: Column): Column - def substr(startPos: Int, len: Int): Column - - def isNull: Column - def isNotNull: Column - - def getItem(ordinal: Int): Column - def getField(fieldName: String): Column - - def cast(to: DataType): Column - def cast(to: String): Column - - def asc: Column - def desc: Column - - def as(alias: String): Column -} - - -/** - * An internal interface defining aggregation APIs for [[DataFrame]]. - * Please use [[DataFrame]] and [[GroupedDataFrame]] directly, and do NOT use this. - */ -private[sql] trait GroupedDataFrameApi { - - def agg(exprs: Map[String, String]): DataFrame - - @scala.annotation.varargs - def agg(expr: Column, exprs: Column*): DataFrame - - def avg(): DataFrame - - def mean(): DataFrame - - def min(): DataFrame - - def max(): DataFrame - - def sum(): DataFrame - - def count(): DataFrame - - // TODO: Add var, std -} |