aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-03-18 00:57:23 -0700
committerReynold Xin <rxin@databricks.com>2016-03-18 00:57:23 -0700
commitbb1fda01fe620422c442b305f5ceeb552871a490 (patch)
tree2997457d5263726df9a9dbc44a17e196a018fe87
parent750ed64cd9db4f81a53caaf1fd6c8a6a0c07887d (diff)
downloadspark-bb1fda01fe620422c442b305f5ceeb552871a490.tar.gz
spark-bb1fda01fe620422c442b305f5ceeb552871a490.tar.bz2
spark-bb1fda01fe620422c442b305f5ceeb552871a490.zip
[SPARK-13826][SQL] Addendum: update documentation for Datasets
## What changes were proposed in this pull request? This patch updates documentations for Datasets. I also updated some internal documentation for exchange/broadcast. ## How was this patch tested? Just documentation/api stability update. Author: Reynold Xin <rxin@databricks.com> Closes #11814 from rxin/dataset-docs.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala81
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala6
4 files changed, 70 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9951f0fabf..7ed1c51360 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -138,7 +138,16 @@ final class DataFrameWriter private[sql](df: DataFrame) {
/**
* Partitions the output by the given columns on the file system. If specified, the output is
- * laid out on the file system similar to Hive's partitioning scheme.
+ * laid out on the file system similar to Hive's partitioning scheme. As an example, when we
+ * partition a dataset by year and then month, the directory layout would look like:
+ *
+ * - year=2016/month=01/
+ * - year=2016/month=02/
+ *
+ * Partitioning is one of the most widely used techniques to optimize physical data layout.
+ * It provides a coarse-grained index for skipping unnecessary data reads when queries have
+ * predicates on the partitioned columns. In order for partitioning to work well, the number
+ * of distinct values in each column should typically be less than tens of thousands.
*
* This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 209bac3629..39f7f35def 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -61,36 +61,48 @@ private[sql] object Dataset {
}
/**
- * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
- * using functional or relational operations.
+ * A [[Dataset]] is a strongly typed collection of domain-specific objects that can be transformed
+ * in parallel using functional or relational operations. Each Dataset also has an untyped view
+ * called a [[DataFrame]], which is a Dataset of [[Row]].
*
- * A [[Dataset]] differs from an [[RDD]] in the following ways:
+ * Operations available on Datasets are divided into transformations and actions. Transformations
+ * are the ones that produce new Datasets, and actions are the ones that trigger computation and
+ * return results. Example transformations include map, filter, select, aggregate (groupBy).
+ * Example actions count, show, or writing data out to file systems.
*
- * - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
- * in the encoded form. This representation allows for additional logical operations and
- * enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
- * an object.
- * - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
- * used to serialize the object into a binary format. Encoders are also capable of mapping the
- * schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime
- * reflection based serialization. Operations that change the type of object stored in the
- * dataset also need an encoder for the new type.
+ * Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally,
+ * a Dataset represents a logical plan that describes the computation required to produce the data.
+ * When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a
+ * physical plan for efficient execution in a parallel or distributed manner. To explore the
+ * logical plan as well as optimized physical plan, use the `explain` function.
*
- * Different from DataFrame in Spark 1.6.0 and earlier versions, a [[Dataset]] can be thought of as
- * a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a
- * generic [[Row]] container. Since Spark 2.0.0, DataFrame is simply a type alias of
- * `Dataset[Row]`.
+ * To efficiently support domain-specific objects, an [[Encoder]] is required. The encoder maps
+ * the domain specific type T to Spark's internal type system. For example, given a class Person
+ * with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code
+ * at runtime to serialize the Person object into a binary structure. This binary structure often
+ * has much lower memory footprint as well as are optimized for efficiency in data processing
+ * (e.g. in a columnar format). To understand the internal binary representation for data, use the
+ * `schema` function.
*
- * The following example creates a `Dataset[Row]` by pointing Spark SQL to a Parquet data set.
+ * There are typically two ways to create a Dataset. The most common way to by pointing Spark
+ * to some files on storage systems, using the `read` function available on a `SparkSession`.
* {{{
- * val people = sqlContext.read.parquet("...") // in Scala
- * Dataset<Row> people = sqlContext.read().parquet("...") // in Java
+ * val people = session.read.parquet("...").as[Person] // Scala
+ * Dataset<Person> people = session.read().parquet("...").as(Encoders.bean(Person.class) // Java
* }}}
*
- * Once created, it can be manipulated using the various domain-specific-language (DSL) functions
- * defined in: [[Dataset]] (this class), [[Column]], and [[functions]].
+ * Datasets can also be created through transformations available on existing Datasets. For example,
+ * the following creates a new Dataset by applying a filter on the existing one:
+ * {{{
+ * val names = people.map(_.name) // in Scala; names is a Dataset[String]
+ * Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING) // in Java 8
+ * }}}
*
- * To select a column from the data frame, use `apply` method in Scala and `col` in Java.
+ * Dataset operations can also be untyped, through the various domain-specific-language (DSL)
+ * functions defined in: [[Dataset]] (this class), [[Column]], and [[functions]]. These operations
+ * are very similar to the operations available in the data frame abstraction in R or Python.
+ *
+ * To select a column from the Dataset, use `apply` method in Scala and `col` in Java.
* {{{
* val ageCol = people("age") // in Scala
* Column ageCol = people.col("age") // in Java
@@ -241,7 +253,6 @@ class Dataset[T] private[sql](
}
/**
- * :: Experimental ::
* Converts this strongly typed collection of data to generic Dataframe. In contrast to the
* strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
* objects that allow fields to be accessed by ordinal or name.
@@ -251,7 +262,6 @@ class Dataset[T] private[sql](
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
- @Experimental
def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, RowEncoder(schema))
/**
@@ -1094,7 +1104,7 @@ class Dataset[T] private[sql](
def cube(cols: Column*): GroupedData = GroupedData(toDF(), cols.map(_.expr), GroupedData.CubeType)
/**
- * Groups the [[Dataset]] 2.0.0
+ * Groups the [[Dataset]] using the specified columns, so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* This is a variant of groupBy that can only group by existing columns using column names
@@ -1314,7 +1324,8 @@ class Dataset[T] private[sql](
/**
* Returns a new [[Dataset]] by taking the first `n` rows. The difference between this function
- * and `head` is that `head` returns an array while `limit` returns a new [[Dataset]].
+ * and `head` is that `head` is an action and returns an array (by triggering query execution)
+ * while `limit` returns a new [[Dataset]].
*
* @group typedrel
* @since 2.0.0
@@ -1327,6 +1338,9 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
*
+ * To do a SQL-style set union (that does deduplication of elements), use this function followed
+ * by a [[distinct]].
+ *
* @group typedrel
* @since 2.0.0
*/
@@ -1349,6 +1363,9 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] containing rows only in both this frame and another frame.
* This is equivalent to `INTERSECT` in SQL.
*
+ * Note that, equality checking is performed directly on the encoded representation of the data
+ * and thus is not affected by a custom `equals` function defined on `T`.
+ *
* @group typedrel
* @since 1.6.0
*/
@@ -1360,6 +1377,9 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] containing rows in this frame but not in another frame.
* This is equivalent to `EXCEPT` in SQL.
*
+ * Note that, equality checking is performed directly on the encoded representation of the data
+ * and thus is not affected by a custom `equals` function defined on `T`.
+ *
* @group typedrel
* @since 2.0.0
*/
@@ -1448,6 +1468,7 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* (Scala-specific) Returns a new [[Dataset]] where each row has been expanded to zero or more
* rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. The columns of
* the input row are implicitly joined with each row that is output by the function.
@@ -1470,6 +1491,7 @@ class Dataset[T] private[sql](
* @group untypedrel
* @since 2.0.0
*/
+ @Experimental
def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
@@ -1489,6 +1511,7 @@ class Dataset[T] private[sql](
}
/**
+ * :: Experimental ::
* (Scala-specific) Returns a new [[Dataset]] where a single column has been expanded to zero
* or more rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. All
* columns of the input row are implicitly joined with each value that is output by the function.
@@ -1500,6 +1523,7 @@ class Dataset[T] private[sql](
* @group untypedrel
* @since 2.0.0
*/
+ @Experimental
def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B])
: DataFrame = {
val dataType = ScalaReflection.schemaFor[B].dataType
@@ -1770,7 +1794,7 @@ class Dataset[T] private[sql](
/**
* Concise syntax for chaining custom transformations.
* {{{
- * def featurize(ds: Dataset[T]) = ...
+ * def featurize(ds: Dataset[T]): Dataset[U] = ...
*
* ds
* .transform(featurize)
@@ -2051,6 +2075,9 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] that contains only the unique rows from this [[Dataset]].
* This is an alias for `dropDuplicates`.
*
+ * Note that, equality checking is performed directly on the encoded representation of the data
+ * and thus is not affected by a custom `equals` function defined on `T`.
+ *
* @group typedrel
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
index 1a5c6a66c4..102a9356df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -23,9 +23,8 @@ import scala.concurrent.duration._
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryNode}
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.util.ThreadUtils
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index 9eaadea1b1..df7ad48812 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -30,7 +30,11 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
/**
- * An interface for exchanges.
+ * Base class for operators that exchange data among multiple threads or processes.
+ *
+ * Exchanges are the key class of operators that enable parallelism. Although the implementation
+ * differs significantly, the concept is similar to the exchange operator described in
+ * "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe.
*/
abstract class Exchange extends UnaryNode {
override def output: Seq[Attribute] = child.output