aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-03-17 21:31:11 -0700
committerReynold Xin <rxin@databricks.com>2016-03-17 21:31:11 -0700
commit10ef4f3e77b3a2a2770a1c869a236203560d4e6d (patch)
tree0af17cec81fa5a0070c296af54bf1365b2999041
parent90a1d8db70b073925ced41c2ba61bebc9060e527 (diff)
downloadspark-10ef4f3e77b3a2a2770a1c869a236203560d4e6d.tar.gz
spark-10ef4f3e77b3a2a2770a1c869a236203560d4e6d.tar.bz2
spark-10ef4f3e77b3a2a2770a1c869a236203560d4e6d.zip
[SPARK-13826][SQL] Revises Dataset ScalaDoc
## What changes were proposed in this pull request? This PR revises Dataset API ScalaDoc. All public methods are divided into the following groups * `groupname basic`: Basic Dataset functions * `groupname action`: Actions * `groupname untypedrel`: Untyped Language Integrated Relational Queries * `groupname typedrel`: Typed Language Integrated Relational Queries * `groupname func`: Functional Transformations * `groupname rdd`: RDD Operations * `groupname output`: Output Operations `since` tag and sample code are also updated. We may want to add more sample code for typed APIs. ## How was this patch tested? Documentation change. Checked by building unidoc locally. Author: Cheng Lian <lian@databricks.com> Closes #11769 from liancheng/spark-13826-ds-api-doc.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala841
1 files changed, 522 insertions, 319 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 75f1ffd51f..209bac3629 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -61,18 +61,34 @@ private[sql] object Dataset {
}
/**
- * :: Experimental ::
- * A distributed collection of data organized into named columns.
+ * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
+ * using functional or relational operations.
*
- * A [[DataFrame]] is equivalent to a relational table in Spark SQL. The following example creates
- * a [[DataFrame]] by pointing Spark SQL to a Parquet data set.
+ * A [[Dataset]] differs from an [[RDD]] in the following ways:
+ *
+ * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
+ * in the encoded form. This representation allows for additional logical operations and
+ * enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
+ * an object.
+ * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
+ * used to serialize the object into a binary format. Encoders are also capable of mapping the
+ * schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime
+ * reflection based serialization. Operations that change the type of object stored in the
+ * dataset also need an encoder for the new type.
+ *
+ * Different from DataFrame in Spark 1.6.0 and earlier versions, a [[Dataset]] can be thought of as
+ * a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a
+ * generic [[Row]] container. Since Spark 2.0.0, DataFrame is simply a type alias of
+ * `Dataset[Row]`.
+ *
+ * The following example creates a `Dataset[Row]` by pointing Spark SQL to a Parquet data set.
* {{{
* val people = sqlContext.read.parquet("...") // in Scala
- * DataFrame people = sqlContext.read().parquet("...") // in Java
+ * Dataset<Row> people = sqlContext.read().parquet("...") // in Java
* }}}
*
* Once created, it can be manipulated using the various domain-specific-language (DSL) functions
- * defined in: [[DataFrame]] (this class), [[Column]], and [[functions]].
+ * defined in: [[Dataset]] (this class), [[Column]], and [[functions]].
*
* To select a column from the data frame, use `apply` method in Scala and `col` in Java.
* {{{
@@ -89,7 +105,7 @@ private[sql] object Dataset {
*
* A more concrete example in Scala:
* {{{
- * // To create DataFrame using SQLContext
+ * // To create Dataset[Row] using SQLContext
* val people = sqlContext.read.parquet("...")
* val department = sqlContext.read.parquet("...")
*
@@ -101,9 +117,9 @@ private[sql] object Dataset {
*
* and in Java:
* {{{
- * // To create DataFrame using SQLContext
- * DataFrame people = sqlContext.read().parquet("...");
- * DataFrame department = sqlContext.read().parquet("...");
+ * // To create Dataset<Row> using SQLContext
+ * Dataset<Row> people = sqlContext.read().parquet("...");
+ * Dataset<Row> department = sqlContext.read().parquet("...");
*
* people.filter("age".gt(30))
* .join(department, people.col("deptId").equalTo(department("id")))
@@ -111,14 +127,16 @@ private[sql] object Dataset {
* .agg(avg(people.col("salary")), max(people.col("age")));
* }}}
*
- * @groupname basic Basic DataFrame functions
- * @groupname dfops Language Integrated Queries
+ * @groupname basic Basic Dataset functions
+ * @groupname action Actions
+ * @groupname untypedrel Untyped Language Integrated Relational Queries
+ * @groupname typedrel Typed Language Integrated Relational Queries
+ * @groupname func Functional Transformations
* @groupname rdd RDD Operations
* @groupname output Output Operations
- * @groupname action Actions
- * @since 1.3.0
+ *
+ * @since 1.6.0
*/
-@Experimental
class Dataset[T] private[sql](
@transient override val sqlContext: SQLContext,
@DeveloperApi @transient override val queryExecution: QueryExecution,
@@ -127,7 +145,7 @@ class Dataset[T] private[sql](
queryExecution.assertAnalyzed()
- // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
+ // Note for Spark contributors: if adding or updating any action in `Dataset`, please make sure
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
@@ -190,6 +208,7 @@ class Dataset[T] private[sql](
/**
* Compose the string representing rows for output
+ *
* @param _numRows Number of rows to show
* @param truncate Whether truncate long strings and align cells right
*/
@@ -222,18 +241,33 @@ class Dataset[T] private[sql](
}
/**
- * Returns the object itself.
+ * :: Experimental ::
+ * Converts this strongly typed collection of data to generic Dataframe. In contrast to the
+ * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
+ * objects that allow fields to be accessed by ordinal or name.
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
// This is declared with parentheses to prevent the Scala compiler from treating
- // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
+ // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
+ @Experimental
def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, RowEncoder(schema))
/**
* :: Experimental ::
- * Converts this [[DataFrame]] to a strongly-typed [[Dataset]] containing objects of the
- * specified type, `U`.
+ * Returns a new [[Dataset]] where each record has been mapped on to the specified type. The
+ * method used to map columns depend on the type of `U`:
+ * - When `U` is a class, fields for the class will be mapped to columns of the same name
+ * (case sensitivity is determined by `spark.sql.caseSensitive`)
+ * - When `U` is a tuple, the columns will be be mapped by ordinal (i.e. the first column will
+ * be assigned to `_1`).
+ * - When `U` is a primitive type (i.e. String, Int, etc). then the first column of the
+ * [[DataFrame]] will be used.
+ *
+ * If the schema of the [[Dataset]] does not match the desired `U` type, you can use `select`
+ * along with `alias` or `as` to rearrange or rename as required.
+ *
* @group basic
* @since 1.6.0
*/
@@ -241,15 +275,17 @@ class Dataset[T] private[sql](
def as[U : Encoder]: Dataset[U] = Dataset[U](sqlContext, logicalPlan)
/**
- * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion
- * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example:
+ * Converts this strongly typed collection of data to generic `DataFrame` with columns renamed.
+ * This can be quite convenient in conversion from a RDD of tuples into a [[DataFrame]] with
+ * meaningful names. For example:
* {{{
* val rdd: RDD[(Int, String)] = ...
- * rdd.toDF() // this implicit conversion creates a DataFrame with column name _1 and _2
+ * rdd.toDF() // this implicit conversion creates a DataFrame with column name `_1` and `_2`
* rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
* }}}
+ *
* @group basic
- * @since 1.3.0
+ * @since 2.0.0
*/
@scala.annotation.varargs
def toDF(colNames: String*): DataFrame = {
@@ -265,16 +301,18 @@ class Dataset[T] private[sql](
}
/**
- * Returns the schema of this [[DataFrame]].
+ * Returns the schema of this [[Dataset]].
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def schema: StructType = queryExecution.analyzed.schema
/**
* Prints the schema to the console in a nice tree format.
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
// scalastyle:off println
override def printSchema(): Unit = println(schema.treeString)
@@ -282,8 +320,9 @@ class Dataset[T] private[sql](
/**
* Prints the plans (logical and physical) to the console for debugging purposes.
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
override def explain(extended: Boolean): Unit = {
val explain = ExplainCommand(queryExecution.logical, extended = extended)
@@ -296,14 +335,17 @@ class Dataset[T] private[sql](
/**
* Prints the physical plan to the console for debugging purposes.
- * @since 1.3.0
+ *
+ * @group basic
+ * @since 1.6.0
*/
override def explain(): Unit = explain(extended = false)
/**
* Returns all column names and their data types as an array.
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def dtypes: Array[(String, String)] = schema.fields.map { field =>
(field.name, field.dataType.toString)
@@ -311,22 +353,24 @@ class Dataset[T] private[sql](
/**
* Returns all column names as an array.
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def columns: Array[String] = schema.fields.map(_.name)
/**
* Returns true if the `collect` and `take` methods can be run locally
* (without any Spark executors).
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]
/**
- * Displays the [[DataFrame]] in a tabular form. Strings more than 20 characters will be
- * truncated, and all cells will be aligned right. For example:
+ * Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated,
+ * and all cells will be aligned right. For example:
* {{{
* year month AVG('Adj Close) MAX('Adj Close)
* 1980 12 0.503218 0.595103
@@ -335,34 +379,36 @@ class Dataset[T] private[sql](
* 1983 03 0.410516 0.442194
* 1984 04 0.450090 0.483521
* }}}
+ *
* @param numRows Number of rows to show
*
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def show(numRows: Int): Unit = show(numRows, truncate = true)
/**
- * Displays the top 20 rows of [[DataFrame]] in a tabular form. Strings more than 20 characters
+ * Displays the top 20 rows of [[Dataset]] in a tabular form. Strings more than 20 characters
* will be truncated, and all cells will be aligned right.
+ *
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def show(): Unit = show(20)
/**
- * Displays the top 20 rows of [[DataFrame]] in a tabular form.
+ * Displays the top 20 rows of [[Dataset]] in a tabular form.
*
* @param truncate Whether truncate long strings. If true, strings more than 20 characters will
- * be truncated and all cells will be aligned right
+ * be truncated and all cells will be aligned right
*
* @group action
- * @since 1.5.0
+ * @since 1.6.0
*/
def show(truncate: Boolean): Unit = show(20, truncate)
/**
- * Displays the [[DataFrame]] in a tabular form. For example:
+ * Displays the [[Dataset]] in a tabular form. For example:
* {{{
* year month AVG('Adj Close) MAX('Adj Close)
* 1980 12 0.503218 0.595103
@@ -376,7 +422,7 @@ class Dataset[T] private[sql](
* be truncated and all cells will be aligned right
*
* @group action
- * @since 1.5.0
+ * @since 1.6.0
*/
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
@@ -386,11 +432,11 @@ class Dataset[T] private[sql](
* Returns a [[DataFrameNaFunctions]] for working with missing data.
* {{{
* // Dropping rows containing any null values.
- * df.na.drop()
+ * ds.na.drop()
* }}}
*
- * @group dfops
- * @since 1.3.1
+ * @group untypedrel
+ * @since 1.6.0
*/
def na: DataFrameNaFunctions = new DataFrameNaFunctions(toDF())
@@ -398,11 +444,11 @@ class Dataset[T] private[sql](
* Returns a [[DataFrameStatFunctions]] for working statistic functions support.
* {{{
* // Finding frequent items in column with name 'a'.
- * df.stat.freqItems(Seq("a"))
+ * ds.stat.freqItems(Seq("a"))
* }}}
*
- * @group dfops
- * @since 1.4.0
+ * @group untypedrel
+ * @since 1.6.0
*/
def stat: DataFrameStatFunctions = new DataFrameStatFunctions(toDF())
@@ -412,8 +458,9 @@ class Dataset[T] private[sql](
* Note that cartesian joins are very expensive without an extra filter that can be pushed down.
*
* @param right Right side of the join operation.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def join(right: DataFrame): DataFrame = withPlan {
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
@@ -436,8 +483,9 @@ class Dataset[T] private[sql](
*
* @param right Right side of the join operation.
* @param usingColumn Name of the column to join on. This column must exist on both sides.
- * @group dfops
- * @since 1.4.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def join(right: DataFrame, usingColumn: String): DataFrame = {
join(right, Seq(usingColumn))
@@ -460,8 +508,9 @@ class Dataset[T] private[sql](
*
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
- * @group dfops
- * @since 1.4.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
join(right, usingColumns, "inner")
@@ -480,8 +529,9 @@ class Dataset[T] private[sql](
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
- * @group dfops
- * @since 1.6.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
@@ -507,8 +557,9 @@ class Dataset[T] private[sql](
* df1.join(df2, $"df1Key" === $"df2Key")
* df1.join(df2).where($"df1Key" === $"df2Key")
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner")
@@ -529,8 +580,9 @@ class Dataset[T] private[sql](
* @param right Right side of the join.
* @param joinExprs Join expression.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = {
// Note that in this function, we introduce a hack in the case of self-join to automatically
@@ -576,6 +628,7 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* Joins this [[Dataset]] returning a [[Tuple2]] for each pair where `condition` evaluates to
* true.
*
@@ -590,8 +643,11 @@ class Dataset[T] private[sql](
* @param other Right side of the join.
* @param condition Join expression.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
+ *
+ * @group typedrel
* @since 1.6.0
*/
+ @Experimental
def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = {
val left = this.logicalPlan
val right = other.logicalPlan
@@ -620,24 +676,28 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair
* where `condition` evaluates to true.
*
* @param other Right side of the join.
* @param condition Join expression.
+ *
+ * @group typedrel
* @since 1.6.0
*/
+ @Experimental
def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = {
joinWith(other, condition, "inner")
}
/**
- * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
+ * Returns a new [[Dataset]] with each partition sorted by the given expressions.
*
* This is the same operation as "SORT BY" in SQL (Hive QL).
*
- * @group dfops
- * @since 1.6.0
+ * @group typedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] = {
@@ -645,12 +705,12 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
+ * Returns a new [[Dataset]] with each partition sorted by the given expressions.
*
* This is the same operation as "SORT BY" in SQL (Hive QL).
*
- * @group dfops
- * @since 1.6.0
+ * @group typedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def sortWithinPartitions(sortExprs: Column*): Dataset[T] = {
@@ -658,15 +718,16 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] sorted by the specified column, all in ascending order.
+ * Returns a new [[Dataset]] sorted by the specified column, all in ascending order.
* {{{
* // The following 3 are equivalent
- * df.sort("sortcol")
- * df.sort($"sortcol")
- * df.sort($"sortcol".asc)
+ * ds.sort("sortcol")
+ * ds.sort($"sortcol")
+ * ds.sort($"sortcol".asc)
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def sort(sortCol: String, sortCols: String*): Dataset[T] = {
@@ -674,12 +735,13 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] sorted by the given expressions. For example:
+ * Returns a new [[Dataset]] sorted by the given expressions. For example:
* {{{
- * df.sort($"col1", $"col2".desc)
+ * ds.sort($"col1", $"col2".desc)
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def sort(sortExprs: Column*): Dataset[T] = {
@@ -687,19 +749,21 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] sorted by the given expressions.
+ * Returns a new [[Dataset]] sorted by the given expressions.
* This is an alias of the `sort` function.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols : _*)
/**
- * Returns a new [[DataFrame]] sorted by the given expressions.
+ * Returns a new [[Dataset]] sorted by the given expressions.
* This is an alias of the `sort` function.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*)
@@ -707,16 +771,18 @@ class Dataset[T] private[sql](
/**
* Selects column based on the column name and return it as a [[Column]].
* Note that the column name can also reference to a nested column like `a.b`.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def apply(colName: String): Column = col(colName)
/**
* Selects column based on the column name and return it as a [[Column]].
* Note that the column name can also reference to a nested column like `a.b`.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def col(colName: String): Column = colName match {
case "*" =>
@@ -727,42 +793,47 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] with an alias set.
- * @group dfops
- * @since 1.3.0
+ * Returns a new [[Dataset]] with an alias set.
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def as(alias: String): Dataset[T] = withTypedPlan {
SubqueryAlias(alias, logicalPlan)
}
/**
- * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
- * @group dfops
- * @since 1.3.0
+ * (Scala-specific) Returns a new [[Dataset]] with an alias set.
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def as(alias: Symbol): Dataset[T] = as(alias.name)
/**
- * Returns a new [[DataFrame]] with an alias set. Same as `as`.
- * @group dfops
- * @since 1.6.0
+ * Returns a new [[Dataset]] with an alias set. Same as `as`.
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def alias(alias: String): Dataset[T] = as(alias)
/**
- * (Scala-specific) Returns a new [[DataFrame]] with an alias set. Same as `as`.
- * @group dfops
- * @since 1.6.0
+ * (Scala-specific) Returns a new [[Dataset]] with an alias set. Same as `as`.
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def alias(alias: Symbol): Dataset[T] = as(alias)
/**
* Selects a set of column based expressions.
* {{{
- * df.select($"colA", $"colB" + 1)
+ * ds.select($"colA", $"colB" + 1)
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame = withPlan {
@@ -775,11 +846,12 @@ class Dataset[T] private[sql](
*
* {{{
* // The following two are equivalent:
- * df.select("colA", "colB")
- * df.select($"colA", $"colB")
+ * ds.select("colA", "colB")
+ * ds.select($"colA", $"colB")
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
@@ -790,11 +862,12 @@ class Dataset[T] private[sql](
*
* {{{
* // The following are equivalent:
- * df.selectExpr("colA", "colB as newName", "abs(colC)")
- * df.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
+ * ds.selectExpr("colA", "colB as newName", "abs(colC)")
+ * ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def selectExpr(exprs: String*): DataFrame = {
@@ -804,14 +877,18 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* Returns a new [[Dataset]] by computing the given [[Column]] expression for each element.
*
* {{{
* val ds = Seq(1, 2, 3).toDS()
* val newDS = ds.select(expr("value + 1").as[Int])
* }}}
+ *
+ * @group typedrel
* @since 1.6.0
*/
+ @Experimental
def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = {
new Dataset[U1](
sqlContext,
@@ -838,16 +915,24 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
+ *
+ * @group typedrel
* @since 1.6.0
*/
+ @Experimental
def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)] =
selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]]
/**
+ * :: Experimental ::
* Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
+ *
+ * @group typedrel
* @since 1.6.0
*/
+ @Experimental
def select[U1, U2, U3](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
@@ -855,9 +940,13 @@ class Dataset[T] private[sql](
selectUntyped(c1, c2, c3).asInstanceOf[Dataset[(U1, U2, U3)]]
/**
+ * :: Experimental ::
* Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
+ *
+ * @group typedrel
* @since 1.6.0
*/
+ @Experimental
def select[U1, U2, U3, U4](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
@@ -866,9 +955,13 @@ class Dataset[T] private[sql](
selectUntyped(c1, c2, c3, c4).asInstanceOf[Dataset[(U1, U2, U3, U4)]]
/**
+ * :: Experimental ::
* Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
+ *
+ * @group typedrel
* @since 1.6.0
*/
+ @Experimental
def select[U1, U2, U3, U4, U5](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
@@ -881,11 +974,12 @@ class Dataset[T] private[sql](
* Filters rows using the given condition.
* {{{
* // The following are equivalent:
- * peopleDf.filter($"age" > 15)
- * peopleDf.where($"age" > 15)
+ * peopleDs.filter($"age" > 15)
+ * peopleDs.where($"age" > 15)
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def filter(condition: Column): Dataset[T] = withTypedPlan {
Filter(condition.expr, logicalPlan)
@@ -894,10 +988,11 @@ class Dataset[T] private[sql](
/**
* Filters rows using the given SQL expression.
* {{{
- * peopleDf.filter("age > 15")
+ * peopleDs.filter("age > 15")
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def filter(conditionExpr: String): Dataset[T] = {
filter(Column(sqlContext.sessionState.sqlParser.parseExpression(conditionExpr)))
@@ -907,42 +1002,45 @@ class Dataset[T] private[sql](
* Filters rows using the given condition. This is an alias for `filter`.
* {{{
* // The following are equivalent:
- * peopleDf.filter($"age" > 15)
- * peopleDf.where($"age" > 15)
+ * peopleDs.filter($"age" > 15)
+ * peopleDs.where($"age" > 15)
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def where(condition: Column): Dataset[T] = filter(condition)
/**
* Filters rows using the given SQL expression.
* {{{
- * peopleDf.where("age > 15")
+ * peopleDs.where("age > 15")
* }}}
- * @group dfops
- * @since 1.5.0
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def where(conditionExpr: String): Dataset[T] = {
filter(Column(sqlContext.sessionState.sqlParser.parseExpression(conditionExpr)))
}
/**
- * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
- * See [[GroupedData]] for all the available aggregate functions.
+ * Groups the [[Dataset]] using the specified columns, so we can run aggregation on them. See
+ * [[GroupedData]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns grouped by department.
- * df.groupBy($"department").avg()
+ * ds.groupBy($"department").avg()
*
* // Compute the max age and average salary, grouped by department and gender.
- * df.groupBy($"department", $"gender").agg(Map(
+ * ds.groupBy($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def groupBy(cols: Column*): GroupedData = {
@@ -950,22 +1048,23 @@ class Dataset[T] private[sql](
}
/**
- * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
+ * Create a multi-dimensional rollup for the current [[Dataset]] using the specified columns,
* so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns rolluped by department and group.
- * df.rollup($"department", $"group").avg()
+ * ds.rollup($"department", $"group").avg()
*
* // Compute the max age and average salary, rolluped by department and gender.
- * df.rollup($"department", $"gender").agg(Map(
+ * ds.rollup($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
- * @group dfops
- * @since 1.4.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def rollup(cols: Column*): GroupedData = {
@@ -973,28 +1072,29 @@ class Dataset[T] private[sql](
}
/**
- * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
+ * Create a multi-dimensional cube for the current [[Dataset]] using the specified columns,
* so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns cubed by department and group.
- * df.cube($"department", $"group").avg()
+ * ds.cube($"department", $"group").avg()
*
* // Compute the max age and average salary, cubed by department and gender.
- * df.cube($"department", $"gender").agg(Map(
+ * ds.cube($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
- * @group dfops
- * @since 1.4.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def cube(cols: Column*): GroupedData = GroupedData(toDF(), cols.map(_.expr), GroupedData.CubeType)
/**
- * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
+ * Groups the [[Dataset]] 2.0.0
* See [[GroupedData]] for all the available aggregate functions.
*
* This is a variant of groupBy that can only group by existing columns using column names
@@ -1002,16 +1102,16 @@ class Dataset[T] private[sql](
*
* {{{
* // Compute the average for all numeric columns grouped by department.
- * df.groupBy("department").avg()
+ * ds.groupBy("department").avg()
*
* // Compute the max age and average salary, grouped by department and gender.
- * df.groupBy($"department", $"gender").agg(Map(
+ * ds.groupBy($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
- * @group dfops
- * @since 1.3.0
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def groupBy(col1: String, cols: String*): GroupedData = {
@@ -1020,26 +1120,38 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* (Scala-specific)
* Reduces the elements of this [[Dataset]] using the specified binary function. The given `func`
* must be commutative and associative or the result may be non-deterministic.
+ *
+ * @group action
* @since 1.6.0
*/
+ @Experimental
def reduce(func: (T, T) => T): T = rdd.reduce(func)
/**
+ * :: Experimental ::
* (Java-specific)
* Reduces the elements of this Dataset using the specified binary function. The given `func`
* must be commutative and associative or the result may be non-deterministic.
+ *
+ * @group action
* @since 1.6.0
*/
+ @Experimental
def reduce(func: ReduceFunction[T]): T = reduce(func.call(_, _))
/**
+ * :: Experimental ::
* (Scala-specific)
* Returns a [[GroupedDataset]] where the data is grouped by the given key `func`.
- * @since 1.6.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
+ @Experimental
def groupByKey[K: Encoder](func: T => K): GroupedDataset[K, T] = {
val inputPlan = logicalPlan
val withGroupingKey = AppendColumns(func, inputPlan)
@@ -1054,9 +1166,13 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* Returns a [[GroupedDataset]] where the data is grouped by the given [[Column]] expressions.
- * @since 1.6.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
+ @Experimental
@scala.annotation.varargs
def groupByKey(cols: Column*): GroupedDataset[Row, T] = {
val withKeyColumns = logicalPlan.output ++ cols.map(_.expr).map(UnresolvedAlias(_))
@@ -1075,15 +1191,19 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* (Java-specific)
* Returns a [[GroupedDataset]] where the data is grouped by the given key `func`.
- * @since 1.6.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
+ @Experimental
def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): GroupedDataset[K, T] =
groupByKey(func.call(_))(encoder)
/**
- * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
+ * Create a multi-dimensional rollup for the current [[Dataset]] using the specified columns,
* so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
@@ -1092,16 +1212,17 @@ class Dataset[T] private[sql](
*
* {{{
* // Compute the average for all numeric columns rolluped by department and group.
- * df.rollup("department", "group").avg()
+ * ds.rollup("department", "group").avg()
*
* // Compute the max age and average salary, rolluped by department and gender.
- * df.rollup($"department", $"gender").agg(Map(
+ * ds.rollup($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
- * @group dfops
- * @since 1.4.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def rollup(col1: String, cols: String*): GroupedData = {
@@ -1110,7 +1231,7 @@ class Dataset[T] private[sql](
}
/**
- * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
+ * Create a multi-dimensional cube for the current [[Dataset]] using the specified columns,
* so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
@@ -1119,16 +1240,16 @@ class Dataset[T] private[sql](
*
* {{{
* // Compute the average for all numeric columns cubed by department and group.
- * df.cube("department", "group").avg()
+ * ds.cube("department", "group").avg()
*
* // Compute the max age and average salary, cubed by department and gender.
- * df.cube($"department", $"gender").agg(Map(
+ * ds.cube($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
- * @group dfops
- * @since 1.4.0
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def cube(col1: String, cols: String*): GroupedData = {
@@ -1137,71 +1258,77 @@ class Dataset[T] private[sql](
}
/**
- * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups.
+ * (Scala-specific) Aggregates on the entire [[Dataset]] without groups.
* {{{
- * // df.agg(...) is a shorthand for df.groupBy().agg(...)
- * df.agg("age" -> "max", "salary" -> "avg")
- * df.groupBy().agg("age" -> "max", "salary" -> "avg")
+ * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+ * ds.agg("age" -> "max", "salary" -> "avg")
+ * ds.groupBy().agg("age" -> "max", "salary" -> "avg")
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
groupBy().agg(aggExpr, aggExprs : _*)
}
/**
- * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups.
+ * (Scala-specific) Aggregates on the entire [[Dataset]] without groups.
* {{{
- * // df.agg(...) is a shorthand for df.groupBy().agg(...)
- * df.agg(Map("age" -> "max", "salary" -> "avg"))
- * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+ * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+ * ds.agg(Map("age" -> "max", "salary" -> "avg"))
+ * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
/**
- * (Java-specific) Aggregates on the entire [[DataFrame]] without groups.
+ * (Java-specific) Aggregates on the entire [[Dataset]] without groups.
* {{{
- * // df.agg(...) is a shorthand for df.groupBy().agg(...)
- * df.agg(Map("age" -> "max", "salary" -> "avg"))
- * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+ * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+ * ds.agg(Map("age" -> "max", "salary" -> "avg"))
+ * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs)
/**
- * Aggregates on the entire [[DataFrame]] without groups.
+ * Aggregates on the entire [[Dataset]] without groups.
* {{{
- * // df.agg(...) is a shorthand for df.groupBy().agg(...)
- * df.agg(max($"age"), avg($"salary"))
- * df.groupBy().agg(max($"age"), avg($"salary"))
+ * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+ * ds.agg(max($"age"), avg($"salary"))
+ * ds.groupBy().agg(max($"age"), avg($"salary"))
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*)
/**
- * Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
- * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
- * @group dfops
- * @since 1.3.0
+ * Returns a new [[Dataset]] by taking the first `n` rows. The difference between this function
+ * and `head` is that `head` returns an array while `limit` returns a new [[Dataset]].
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def limit(n: Int): Dataset[T] = withTypedPlan {
Limit(Literal(n), logicalPlan)
}
/**
- * Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
+ * Returns a new [[Dataset]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def unionAll(other: Dataset[T]): Dataset[T] = withTypedPlan {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
@@ -1209,62 +1336,81 @@ class Dataset[T] private[sql](
CombineUnions(Union(logicalPlan, other.logicalPlan))
}
+ /**
+ * Returns a new [[Dataset]] containing union of rows in this frame and another frame.
+ * This is equivalent to `UNION ALL` in SQL.
+ *
+ * @group typedrel
+ * @since 2.0.0
+ */
def union(other: Dataset[T]): Dataset[T] = unionAll(other)
/**
- * Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
+ * Returns a new [[Dataset]] containing rows only in both this frame and another frame.
* This is equivalent to `INTERSECT` in SQL.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def intersect(other: Dataset[T]): Dataset[T] = withTypedPlan {
Intersect(logicalPlan, other.logicalPlan)
}
/**
- * Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
+ * Returns a new [[Dataset]] containing rows in this frame but not in another frame.
* This is equivalent to `EXCEPT` in SQL.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def except(other: Dataset[T]): Dataset[T] = withTypedPlan {
Except(logicalPlan, other.logicalPlan)
}
+ /**
+ * Returns a new [[Dataset]] containing rows in this frame but not in another frame.
+ * This is equivalent to `EXCEPT` in SQL.
+ *
+ * @group typedrel
+ * @since 2.0.0
+ */
def subtract(other: Dataset[T]): Dataset[T] = except(other)
/**
- * Returns a new [[DataFrame]] by sampling a fraction of rows.
+ * Returns a new [[Dataset]] by sampling a fraction of rows.
*
* @param withReplacement Sample with replacement or not.
* @param fraction Fraction of rows to generate.
* @param seed Seed for sampling.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = withTypedPlan {
Sample(0.0, fraction, withReplacement, seed, logicalPlan)()
}
/**
- * Returns a new [[DataFrame]] by sampling a fraction of rows, using a random seed.
+ * Returns a new [[Dataset]] by sampling a fraction of rows, using a random seed.
*
* @param withReplacement Sample with replacement or not.
* @param fraction Fraction of rows to generate.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = {
sample(withReplacement, fraction, Utils.random.nextLong)
}
/**
- * Randomly splits this [[DataFrame]] with the provided weights.
+ * Randomly splits this [[Dataset]] with the provided weights.
*
* @param weights weights for splits, will be normalized if they don't sum to 1.
* @param seed Seed for sampling.
- * @group dfops
- * @since 1.4.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] = {
// It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
@@ -1281,29 +1427,28 @@ class Dataset[T] private[sql](
}
/**
- * Randomly splits this [[DataFrame]] with the provided weights.
+ * Randomly splits this [[Dataset]] with the provided weights.
*
* @param weights weights for splits, will be normalized if they don't sum to 1.
- * @group dfops
- * @since 1.4.0
+ * @group typedrel
+ * @since 2.0.0
*/
def randomSplit(weights: Array[Double]): Array[Dataset[T]] = {
randomSplit(weights, Utils.random.nextLong)
}
/**
- * Randomly splits this [[DataFrame]] with the provided weights. Provided for the Python Api.
+ * Randomly splits this [[Dataset]] with the provided weights. Provided for the Python Api.
*
* @param weights weights for splits, will be normalized if they don't sum to 1.
* @param seed Seed for sampling.
- * @group dfops
*/
private[spark] def randomSplit(weights: List[Double], seed: Long): Array[Dataset[T]] = {
randomSplit(weights.toArray, seed)
}
/**
- * (Scala-specific) Returns a new [[DataFrame]] where each row has been expanded to zero or more
+ * (Scala-specific) Returns a new [[Dataset]] where each row has been expanded to zero or more
* rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. The columns of
* the input row are implicitly joined with each row that is output by the function.
*
@@ -1312,17 +1457,18 @@ class Dataset[T] private[sql](
*
* {{{
* case class Book(title: String, words: String)
- * val df: RDD[Book]
+ * val ds: Dataset[Book]
*
* case class Word(word: String)
- * val allWords = df.explode('words) {
+ * val allWords = ds.explode('words) {
* case Row(words: String) => words.split(" ").map(Word(_))
* }
*
* val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
@@ -1343,15 +1489,16 @@ class Dataset[T] private[sql](
}
/**
- * (Scala-specific) Returns a new [[DataFrame]] where a single column has been expanded to zero
+ * (Scala-specific) Returns a new [[Dataset]] where a single column has been expanded to zero
* or more rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. All
* columns of the input row are implicitly joined with each value that is output by the function.
*
* {{{
- * df.explode("words", "word") {words: String => words.split(" ")}
+ * ds.explode("words", "word") {words: String => words.split(" ")}
* }}}
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B])
: DataFrame = {
@@ -1372,13 +1519,12 @@ class Dataset[T] private[sql](
}
}
- /////////////////////////////////////////////////////////////////////////////
-
/**
- * Returns a new [[DataFrame]] by adding a column or replacing the existing column that has
+ * Returns a new [[Dataset]] by adding a column or replacing the existing column that has
* the same name.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def withColumn(colName: String, col: Column): DataFrame = {
val resolver = sqlContext.sessionState.analyzer.resolver
@@ -1399,7 +1545,7 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] by adding a column with metadata.
+ * Returns a new [[Dataset]] by adding a column with metadata.
*/
private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = {
val resolver = sqlContext.sessionState.analyzer.resolver
@@ -1420,10 +1566,11 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] with a column renamed.
+ * Returns a new [[Dataset]] with a column renamed.
* This is a no-op if schema doesn't contain existingName.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def withColumnRenamed(existingName: String, newName: String): DataFrame = {
val resolver = sqlContext.sessionState.analyzer.resolver
@@ -1444,20 +1591,22 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] with a column dropped.
+ * Returns a new [[Dataset]] with a column dropped.
* This is a no-op if schema doesn't contain column name.
- * @group dfops
- * @since 1.4.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def drop(colName: String): DataFrame = {
drop(Seq(colName) : _*)
}
/**
- * Returns a new [[DataFrame]] with columns dropped.
+ * Returns a new [[Dataset]] with columns dropped.
* This is a no-op if schema doesn't contain column name(s).
- * @group dfops
- * @since 1.6.0
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def drop(colNames: String*): DataFrame = {
@@ -1472,12 +1621,13 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] with a column dropped.
+ * Returns a new [[Dataset]] with a column dropped.
* This version of drop accepts a Column rather than a name.
- * This is a no-op if the DataFrame doesn't have a column
+ * This is a no-op if the Datasetdoesn't have a column
* with an equivalent expression.
- * @group dfops
- * @since 1.4.1
+ *
+ * @group untypedrel
+ * @since 2.0.0
*/
def drop(col: Column): DataFrame = {
val expression = col match {
@@ -1494,19 +1644,20 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
+ * Returns a new [[Dataset]] that contains only the unique rows from this [[Dataset]].
* This is an alias for `distinct`.
- * @group dfops
- * @since 1.4.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
/**
- * (Scala-specific) Returns a new [[DataFrame]] with duplicate rows removed, considering only
+ * (Scala-specific) Returns a new [[Dataset]] with duplicate rows removed, considering only
* the subset of columns.
*
- * @group dfops
- * @since 1.4.0
+ * @group typedrel
+ * @since 2.0.0
*/
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val groupCols = colNames.map(resolve)
@@ -1522,11 +1673,11 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] with duplicate rows removed, considering only
+ * Returns a new [[Dataset]] with duplicate rows removed, considering only
* the subset of columns.
*
- * @group dfops
- * @since 1.4.0
+ * @group typedrel
+ * @since 2.0.0
*/
def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
@@ -1535,11 +1686,11 @@ class Dataset[T] private[sql](
* If no columns are given, this function computes statistics for all numerical columns.
*
* This function is meant for exploratory data analysis, as we make no guarantee about the
- * backward compatibility of the schema of the resulting [[DataFrame]]. If you want to
+ * backward compatibility of the schema of the resulting [[Dataset]]. If you want to
* programmatically compute summary statistics, use the `agg` function instead.
*
* {{{
- * df.describe("age", "height").show()
+ * ds.describe("age", "height").show()
*
* // output:
* // summary age height
@@ -1551,7 +1702,7 @@ class Dataset[T] private[sql](
* }}}
*
* @group action
- * @since 1.3.1
+ * @since 1.6.0
*/
@scala.annotation.varargs
def describe(cols: String*): DataFrame = withPlan {
@@ -1596,7 +1747,7 @@ class Dataset[T] private[sql](
* all the data is loaded into the driver's memory.
*
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def head(n: Int): Array[T] = withTypedCallback("head", limit(n)) { df =>
df.collect(needCallback = false)
@@ -1605,64 +1756,86 @@ class Dataset[T] private[sql](
/**
* Returns the first row.
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def head(): T = head(1).head
/**
* Returns the first row. Alias for head().
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def first(): T = head()
/**
* Concise syntax for chaining custom transformations.
* {{{
- * def featurize(ds: DataFrame) = ...
+ * def featurize(ds: Dataset[T]) = ...
*
- * df
+ * ds
* .transform(featurize)
* .transform(...)
* }}}
+ *
+ * @group func
* @since 1.6.0
*/
def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
/**
+ * :: Experimental ::
* (Scala-specific)
* Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
+ *
+ * @group func
* @since 1.6.0
*/
+ @Experimental
def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
/**
+ * :: Experimental ::
* (Java-specific)
* Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
+ *
+ * @group func
* @since 1.6.0
*/
+ @Experimental
def filter(func: FilterFunction[T]): Dataset[T] = filter(t => func.call(t))
/**
+ * :: Experimental ::
* (Scala-specific)
* Returns a new [[Dataset]] that contains the result of applying `func` to each element.
+ *
+ * @group func
* @since 1.6.0
*/
+ @Experimental
def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(_.map(func))
/**
+ * :: Experimental ::
* (Java-specific)
* Returns a new [[Dataset]] that contains the result of applying `func` to each element.
+ *
+ * @group func
* @since 1.6.0
*/
+ @Experimental
def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] =
map(t => func.call(t))(encoder)
/**
+ * :: Experimental ::
* (Scala-specific)
* Returns a new [[Dataset]] that contains the result of applying `func` to each partition.
+ *
+ * @group func
* @since 1.6.0
*/
+ @Experimental
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sqlContext,
@@ -1671,30 +1844,42 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* (Java-specific)
* Returns a new [[Dataset]] that contains the result of applying `func` to each partition.
+ *
+ * @group func
* @since 1.6.0
*/
+ @Experimental
def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).asScala
mapPartitions(func)(encoder)
}
/**
+ * :: Experimental ::
* (Scala-specific)
* Returns a new [[Dataset]] by first applying a function to all elements of this [[Dataset]],
* and then flattening the results.
+ *
+ * @group func
* @since 1.6.0
*/
+ @Experimental
def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
mapPartitions(_.flatMap(func))
/**
+ * :: Experimental ::
* (Java-specific)
* Returns a new [[Dataset]] by first applying a function to all elements of this [[Dataset]],
* and then flattening the results.
+ *
+ * @group func
* @since 1.6.0
*/
+ @Experimental
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
val func: (T) => Iterator[U] = x => f.call(x).asScala
flatMap(func)(encoder)
@@ -1702,8 +1887,9 @@ class Dataset[T] private[sql](
/**
* Applies a function `f` to all rows.
- * @group rdd
- * @since 1.3.0
+ *
+ * @group action
+ * @since 1.6.0
*/
def foreach(f: T => Unit): Unit = withNewExecutionId {
rdd.foreach(f)
@@ -1712,14 +1898,17 @@ class Dataset[T] private[sql](
/**
* (Java-specific)
* Runs `func` on each element of this [[Dataset]].
+ *
+ * @group action
* @since 1.6.0
*/
def foreach(func: ForeachFunction[T]): Unit = foreach(func.call(_))
/**
- * Applies a function f to each partition of this [[DataFrame]].
- * @group rdd
- * @since 1.3.0
+ * Applies a function f to each partition of this [[Dataset]].
+ *
+ * @group action
+ * @since 1.6.0
*/
def foreachPartition(f: Iterator[T] => Unit): Unit = withNewExecutionId {
rdd.foreachPartition(f)
@@ -1728,24 +1917,26 @@ class Dataset[T] private[sql](
/**
* (Java-specific)
* Runs `func` on each partition of this [[Dataset]].
+ *
+ * @group action
* @since 1.6.0
*/
def foreachPartition(func: ForeachPartitionFunction[T]): Unit =
foreachPartition(it => func.call(it.asJava))
/**
- * Returns the first `n` rows in the [[DataFrame]].
+ * Returns the first `n` rows in the [[Dataset]].
*
* Running take requires moving data into the application's driver process, and doing so with
* a very large `n` can crash the driver process with OutOfMemoryError.
*
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def take(n: Int): Array[T] = head(n)
/**
- * Returns the first `n` rows in the [[DataFrame]] as a list.
+ * Returns the first `n` rows in the [[Dataset]] as a list.
*
* Running take requires moving data into the application's driver process, and doing so with
* a very large `n` can crash the driver process with OutOfMemoryError.
@@ -1756,7 +1947,7 @@ class Dataset[T] private[sql](
def takeAsList(n: Int): java.util.List[T] = java.util.Arrays.asList(take(n) : _*)
/**
- * Returns an array that contains all of [[Row]]s in this [[DataFrame]].
+ * Returns an array that contains all of [[Row]]s in this [[Dataset]].
*
* Running collect requires moving all the data into the application's driver process, and
* doing so on a very large dataset can crash the driver process with OutOfMemoryError.
@@ -1764,18 +1955,18 @@ class Dataset[T] private[sql](
* For Java API, use [[collectAsList]].
*
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def collect(): Array[T] = collect(needCallback = true)
/**
- * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
+ * Returns a Java list that contains all of [[Row]]s in this [[Dataset]].
*
* Running collect requires moving all the data into the application's driver process, and
* doing so on a very large dataset can crash the driver process with OutOfMemoryError.
*
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def collectAsList(): java.util.List[T] = withCallback("collectAsList", toDF()) { _ =>
withNewExecutionId {
@@ -1797,31 +1988,32 @@ class Dataset[T] private[sql](
}
/**
- * Returns the number of rows in the [[DataFrame]].
+ * Returns the number of rows in the [[Dataset]].
* @group action
- * @since 1.3.0
+ * @since 1.6.0
*/
def count(): Long = withCallback("count", groupBy().count()) { df =>
df.collect(needCallback = false).head.getLong(0)
}
/**
- * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
- * @group dfops
- * @since 1.3.0
+ * Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+ *
+ * @group typedrel
+ * @since 1.6.0
*/
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = true, logicalPlan)
}
/**
- * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
- * `numPartitions`. The resulting DataFrame is hash partitioned.
+ * Returns a new [[Dataset]] partitioned by the given partitioning expressions into
+ * `numPartitions`. The resulting Datasetis hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
- * @group dfops
- * @since 1.6.0
+ * @group typedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
@@ -1829,13 +2021,13 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving
- * the existing number of partitions. The resulting DataFrame is hash partitioned.
+ * Returns a new [[Dataset]] partitioned by the given partitioning expressions preserving
+ * the existing number of partitions. The resulting Datasetis hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
- * @group dfops
- * @since 1.6.0
+ * @group typedrel
+ * @since 2.0.0
*/
@scala.annotation.varargs
def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
@@ -1843,29 +2035,32 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
+ * Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
+ *
* @group rdd
- * @since 1.4.0
+ * @since 1.6.0
*/
def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = false, logicalPlan)
}
/**
- * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
+ * Returns a new [[Dataset]] that contains only the unique rows from this [[Dataset]].
* This is an alias for `dropDuplicates`.
- * @group dfops
- * @since 1.3.0
+ *
+ * @group typedrel
+ * @since 2.0.0
*/
def distinct(): Dataset[T] = dropDuplicates()
/**
- * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`).
+ * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`).
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def persist(): this.type = {
sqlContext.cacheManager.cacheQuery(this)
@@ -1873,19 +2068,21 @@ class Dataset[T] private[sql](
}
/**
- * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`).
+ * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`).
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def cache(): this.type = persist()
/**
- * Persist this [[DataFrame]] with the given storage level.
+ * Persist this [[Dataset]] with the given storage level.
* @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,
* `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
* `MEMORY_AND_DISK_2`, etc.
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def persist(newLevel: StorageLevel): this.type = {
sqlContext.cacheManager.cacheQuery(this, None, newLevel)
@@ -1893,10 +2090,12 @@ class Dataset[T] private[sql](
}
/**
- * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk.
+ * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk.
+ *
* @param blocking Whether to block until all blocks are deleted.
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def unpersist(blocking: Boolean): this.type = {
sqlContext.cacheManager.tryUncacheQuery(this, blocking)
@@ -1904,51 +2103,47 @@ class Dataset[T] private[sql](
}
/**
- * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk.
+ * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk.
+ *
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def unpersist(): this.type = unpersist(blocking = false)
- /////////////////////////////////////////////////////////////////////////////
- // I/O
- /////////////////////////////////////////////////////////////////////////////
-
/**
- * Represents the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s. Note that the RDD is
+ * Represents the content of the [[Dataset]] as an [[RDD]] of [[Row]]s. Note that the RDD is
* memoized. Once called, it won't change even if you change any query planning related Spark SQL
* configurations (e.g. `spark.sql.shuffle.partitions`).
+ *
* @group rdd
- * @since 1.3.0
+ * @since 1.6.0
*/
lazy val rdd: RDD[T] = {
- // use a local variable to make sure the map closure doesn't capture the whole DataFrame
- val schema = this.schema
queryExecution.toRdd.mapPartitions { rows =>
rows.map(boundTEncoder.fromRow)
}
}
/**
- * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+ * Returns the content of the [[Dataset]] as a [[JavaRDD]] of [[Row]]s.
* @group rdd
- * @since 1.3.0
+ * @since 1.6.0
*/
def toJavaRDD: JavaRDD[T] = rdd.toJavaRDD()
/**
- * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+ * Returns the content of the [[Dataset]] as a [[JavaRDD]] of [[Row]]s.
* @group rdd
- * @since 1.3.0
+ * @since 1.6.0
*/
def javaRDD: JavaRDD[T] = toJavaRDD
/**
- * Registers this [[DataFrame]] as a temporary table using the given name. The lifetime of this
- * temporary table is tied to the [[SQLContext]] that was used to create this DataFrame.
+ * Registers this [[Dataset]] as a temporary table using the given name. The lifetime of this
+ * temporary table is tied to the [[SQLContext]] that was used to create this Dataset.
*
* @group basic
- * @since 1.3.0
+ * @since 1.6.0
*/
def registerTempTable(tableName: String): Unit = {
sqlContext.registerDataFrameAsTable(toDF(), tableName)
@@ -1956,18 +2151,19 @@ class Dataset[T] private[sql](
/**
* :: Experimental ::
- * Interface for saving the content of the [[DataFrame]] out into external storage or streams.
+ * Interface for saving the content of the [[Dataset]] out into external storage or streams.
*
* @group output
- * @since 1.4.0
+ * @since 1.6.0
*/
@Experimental
def write: DataFrameWriter = new DataFrameWriter(toDF())
/**
- * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
- * @group rdd
- * @since 1.3.0
+ * Returns the content of the [[Dataset]] as a [[Dataset]] of JSON strings.
+ *
+ * @group basic
+ * @since 1.6.0
*/
def toJSON: Dataset[String] = {
val rowSchema = this.schema
@@ -1998,9 +2194,12 @@ class Dataset[T] private[sql](
}
/**
- * Returns a best-effort snapshot of the files that compose this DataFrame. This method simply
+ * Returns a best-effort snapshot of the files that compose this Dataset. This method simply
* asks each constituent BaseRelation for its respective files and takes the union of all results.
* Depending on the source relations, this may not find all input files. Duplicates are removed.
+ *
+ * @group basic
+ * @since 2.0.0
*/
def inputFiles: Array[String] = {
val files: Seq[String] = logicalPlan.collect {
@@ -2013,7 +2212,7 @@ class Dataset[T] private[sql](
}
////////////////////////////////////////////////////////////////////////////
- // for Python API
+ // For Python API
////////////////////////////////////////////////////////////////////////////
/**
@@ -2031,8 +2230,12 @@ class Dataset[T] private[sql](
}
}
+ ////////////////////////////////////////////////////////////////////////////
+ // Private Helpers
+ ////////////////////////////////////////////////////////////////////////////
+
/**
- * Wrap a DataFrame action to track all Spark jobs in the body so that we can connect them with
+ * Wrap a Dataset action to track all Spark jobs in the body so that we can connect them with
* an execution.
*/
private[sql] def withNewExecutionId[U](body: => U): U = {
@@ -2040,7 +2243,7 @@ class Dataset[T] private[sql](
}
/**
- * Wrap a DataFrame action to track the QueryExecution and time cost, then report to the
+ * Wrap a Dataset action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
*/
private def withCallback[U](name: String, df: DataFrame)(action: DataFrame => U) = {
@@ -2096,7 +2299,7 @@ class Dataset[T] private[sql](
Dataset.newDataFrame(sqlContext, logicalPlan)
}
- /** A convenient function to wrap a logical plan and produce a DataFrame. */
+ /** A convenient function to wrap a logical plan and produce a Dataset. */
@inline private def withTypedPlan(logicalPlan: => LogicalPlan): Dataset[T] = {
new Dataset[T](sqlContext, logicalPlan, encoder)
}