aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-02-08 18:56:51 -0800
committerReynold Xin <rxin@databricks.com>2015-02-08 18:56:51 -0800
commita052ed42501fee3641348337505b6176426653c4 (patch)
treee708efe88401e54d61374b20e588b266c7836050 /sql/core
parent56aff4bd6c7c9d18f4f962025708f20a4a82dcf0 (diff)
downloadspark-a052ed42501fee3641348337505b6176426653c4.tar.gz
spark-a052ed42501fee3641348337505b6176426653c4.tar.bz2
spark-a052ed42501fee3641348337505b6176426653c4.zip
[SPARK-5643][SQL] Add a show method to print the content of a DataFrame in tabular format.
An example: ``` year month AVG('Adj Close) MAX('Adj Close) 1980 12 0.503218 0.595103 1981 01 0.523289 0.570307 1982 02 0.436504 0.475256 1983 03 0.410516 0.442194 1984 04 0.450090 0.483521 ``` Author: Reynold Xin <rxin@databricks.com> Closes #4416 from rxin/SPARK-5643 and squashes the following commits: d0e0d6e [Reynold Xin] [SQL] Minor update to data source and statistics documentation. 269da83 [Reynold Xin] Updated isLocal comment. 2cf3c27 [Reynold Xin] Moved logic into optimizer. 1a04d8b [Reynold Xin] [SPARK-5643][SQL] Add a show method to print the content of a DataFrame in columnar format.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala41
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala15
5 files changed, 72 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 8ad6526f87..17ea3cde8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -102,7 +102,7 @@ trait DataFrame extends RDDApi[Row] {
* }}}
*/
@scala.annotation.varargs
- def toDataFrame(colName: String, colNames: String*): DataFrame
+ def toDataFrame(colNames: String*): DataFrame
/** Returns the schema of this [[DataFrame]]. */
def schema: StructType
@@ -117,6 +117,25 @@ trait DataFrame extends RDDApi[Row] {
def printSchema(): Unit
/**
+ * Returns true if the `collect` and `take` methods can be run locally
+ * (without any Spark executors).
+ */
+ def isLocal: Boolean
+
+ /**
+ * Displays the [[DataFrame]] in a tabular form. For example:
+ * {{{
+ * year month AVG('Adj Close) MAX('Adj Close)
+ * 1980 12 0.503218 0.595103
+ * 1981 01 0.523289 0.570307
+ * 1982 02 0.436504 0.475256
+ * 1983 03 0.410516 0.442194
+ * 1984 04 0.450090 0.483521
+ * }}}
+ */
+ def show(): Unit
+
+ /**
* Cartesian join with another [[DataFrame]].
*
* Note that cartesian joins are very expensive without an extra filter that can be pushed down.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 789bcf6184..fa05a5dcac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -90,14 +90,13 @@ private[sql] class DataFrameImpl protected[sql](
}
}
- override def toDataFrame(colName: String, colNames: String*): DataFrame = {
- val newNames = colName +: colNames
- require(schema.size == newNames.size,
+ override def toDataFrame(colNames: String*): DataFrame = {
+ require(schema.size == colNames.size,
"The number of columns doesn't match.\n" +
"Old column names: " + schema.fields.map(_.name).mkString(", ") + "\n" +
- "New column names: " + newNames.mkString(", "))
+ "New column names: " + colNames.mkString(", "))
- val newCols = schema.fieldNames.zip(newNames).map { case (oldName, newName) =>
+ val newCols = schema.fieldNames.zip(colNames).map { case (oldName, newName) =>
apply(oldName).as(newName)
}
select(newCols :_*)
@@ -113,6 +112,38 @@ private[sql] class DataFrameImpl protected[sql](
override def printSchema(): Unit = println(schema.treeString)
+ override def isLocal: Boolean = {
+ logicalPlan.isInstanceOf[LocalRelation]
+ }
+
+ override def show(): Unit = {
+ val data = take(20)
+ val numCols = schema.fieldNames.length
+
+ // For cells that are beyond 20 characters, replace it with the first 17 and "..."
+ val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row =>
+ row.toSeq.map { cell =>
+ val str = if (cell == null) "null" else cell.toString
+ if (str.length > 20) str.substring(0, 17) + "..." else str
+ } : Seq[String]
+ }
+
+ // Compute the width of each column
+ val colWidths = Array.fill(numCols)(0)
+ for (row <- rows) {
+ for ((cell, i) <- row.zipWithIndex) {
+ colWidths(i) = math.max(colWidths(i), cell.length)
+ }
+ }
+
+ // Pad the cells and print them
+ println(rows.map { row =>
+ row.zipWithIndex.map { case (cell, i) =>
+ String.format(s"%-${colWidths(i)}s", cell)
+ }.mkString(" ")
+ }.mkString("\n"))
+ }
+
override def join(right: DataFrame): DataFrame = {
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 6043fb4dee..782f6e28ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -48,7 +48,7 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
protected[sql] override def logicalPlan: LogicalPlan = err()
- override def toDataFrame(colName: String, colNames: String*): DataFrame = err()
+ override def toDataFrame(colNames: String*): DataFrame = err()
override def schema: StructType = err()
@@ -58,6 +58,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def printSchema(): Unit = err()
+ override def show(): Unit = err()
+
+ override def isLocal: Boolean = false
+
override def join(right: DataFrame): DataFrame = err()
override def join(right: DataFrame, joinExprs: Column): DataFrame = err()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 66aed5d511..4dc506c21a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -17,9 +17,6 @@
package org.apache.spark.sql.execution
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.runtime.universe.TypeTag
-
import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
@@ -40,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
@transient lazy val buildProjection = newMutableProjection(projectList, child.output)
- def execute() = child.execute().mapPartitions { iter =>
+ override def execute() = child.execute().mapPartitions { iter =>
val resuableProjection = buildProjection()
iter.map(resuableProjection)
}
@@ -55,7 +52,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
@transient lazy val conditionEvaluator = newPredicate(condition, child.output)
- def execute() = child.execute().mapPartitions { iter =>
+ override def execute() = child.execute().mapPartitions { iter =>
iter.filter(conditionEvaluator)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a640ba57e0..5eecc303ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -87,13 +87,13 @@ trait CreatableRelationProvider {
/**
* ::DeveloperApi::
- * Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
- * be able to produce the schema of their data in the form of a [[StructType]] Concrete
+ * Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
+ * be able to produce the schema of their data in the form of a [[StructType]]. Concrete
* implementation should inherit from one of the descendant `Scan` classes, which define various
* abstract methods for execution.
*
* BaseRelations must also define a equality function that only returns true when the two
- * instances will return the same data. This equality function is used when determining when
+ * instances will return the same data. This equality function is used when determining when
* it is safe to substitute cached results for a given relation.
*/
@DeveloperApi
@@ -102,13 +102,16 @@ abstract class BaseRelation {
def schema: StructType
/**
- * Returns an estimated size of this relation in bytes. This information is used by the planner
+ * Returns an estimated size of this relation in bytes. This information is used by the planner
* to decided when it is safe to broadcast a relation and can be overridden by sources that
* know the size ahead of time. By default, the system will assume that tables are too
- * large to broadcast. This method will be called multiple times during query planning
+ * large to broadcast. This method will be called multiple times during query planning
* and thus should not perform expensive operations for each invocation.
+ *
+ * Note that it is always better to overestimate size than underestimate, because underestimation
+ * could lead to execution plans that are suboptimal (i.e. broadcasting a very large table).
*/
- def sizeInBytes = sqlContext.conf.defaultSizeInBytes
+ def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes
}
/**