aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-02-16 19:00:30 -0800
committerReynold Xin <rxin@databricks.com>2015-02-16 19:00:30 -0800
commit0e180bfc3c7f18780d4fc4f42681609832418e43 (patch)
treee87dc86f26072db58ec4e71030c782d197e65e46 /sql
parent58a82a7882d7a8a7e4064278c4bf28607d9a42ba (diff)
downloadspark-0e180bfc3c7f18780d4fc4f42681609832418e43.tar.gz
spark-0e180bfc3c7f18780d4fc4f42681609832418e43.tar.bz2
spark-0e180bfc3c7f18780d4fc4f42681609832418e43.zip
[SQL] Various DataFrame doc changes.
Added a bunch of tags. Also changed parquetFile to take varargs rather than a string followed by varargs. Author: Reynold Xin <rxin@databricks.com> Closes #4636 from rxin/df-doc and squashes the following commits: 651f80c [Reynold Xin] Fixed parquetFile in PySpark. 8dc3024 [Reynold Xin] [SQL] Various DataFrame doc changes.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Column.scala165
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala97
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala226
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala3
8 files changed, 433 insertions, 83 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index f6ecee1af8..8b6241c213 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import scala.language.implicitConversions
+import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
@@ -41,21 +42,15 @@ private[sql] object Column {
/**
+ * :: Experimental ::
* A column in a [[DataFrame]].
*
- * `Column` instances can be created by:
- * {{{
- * // 1. Select a column out of a DataFrame
- * df("colName")
- *
- * // 2. Create a literal expression
- * Literal(1)
- *
- * // 3. Create new columns from
- * }}}
- *
+ * @groupname java_expr_ops Java-specific expression operators.
+ * @groupname expr_ops Expression operators.
+ * @groupname df_ops DataFrame functions.
+ * @groupname Ungrouped Support functions for DataFrames.
*/
-// TODO: Improve documentation.
+@Experimental
trait Column extends DataFrame {
protected[sql] def expr: Expression
@@ -129,6 +124,8 @@ trait Column extends DataFrame {
* import static org.apache.spark.sql.functions.*;
* df.select( negate(col("amount") );
* }}}
+ *
+ * @group expr_ops
*/
def unary_- : Column = exprToColumn(UnaryMinus(expr))
@@ -142,6 +139,8 @@ trait Column extends DataFrame {
* import static org.apache.spark.sql.functions.*;
* df.filter( not(df.col("isActive")) );
* }}
+ *
+ * @group expr_ops
*/
def unary_! : Column = exprToColumn(Not(expr))
@@ -155,6 +154,8 @@ trait Column extends DataFrame {
* import static org.apache.spark.sql.functions.*;
* df.filter( col("colA").equalTo(col("colB")) );
* }}}
+ *
+ * @group expr_ops
*/
def === (other: Any): Column = constructColumn(other) { o =>
EqualTo(expr, o.expr)
@@ -170,6 +171,8 @@ trait Column extends DataFrame {
* import static org.apache.spark.sql.functions.*;
* df.filter( col("colA").equalTo(col("colB")) );
* }}}
+ *
+ * @group expr_ops
*/
def equalTo(other: Any): Column = this === other
@@ -184,6 +187,8 @@ trait Column extends DataFrame {
* import static org.apache.spark.sql.functions.*;
* df.filter( col("colA").notEqual(col("colB")) );
* }}}
+ *
+ * @group expr_ops
*/
def !== (other: Any): Column = constructColumn(other) { o =>
Not(EqualTo(expr, o.expr))
@@ -200,6 +205,8 @@ trait Column extends DataFrame {
* import static org.apache.spark.sql.functions.*;
* df.filter( col("colA").notEqual(col("colB")) );
* }}}
+ *
+ * @group java_expr_ops
*/
def notEqual(other: Any): Column = constructColumn(other) { o =>
Not(EqualTo(expr, o.expr))
@@ -215,6 +222,8 @@ trait Column extends DataFrame {
* import static org.apache.spark.sql.functions.*;
* people.select( people("age").gt(21) );
* }}}
+ *
+ * @group expr_ops
*/
def > (other: Any): Column = constructColumn(other) { o =>
GreaterThan(expr, o.expr)
@@ -230,6 +239,8 @@ trait Column extends DataFrame {
* import static org.apache.spark.sql.functions.*;
* people.select( people("age").gt(21) );
* }}}
+ *
+ * @group java_expr_ops
*/
def gt(other: Any): Column = this > other
@@ -242,6 +253,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("age").lt(21) );
* }}}
+ *
+ * @group expr_ops
*/
def < (other: Any): Column = constructColumn(other) { o =>
LessThan(expr, o.expr)
@@ -256,6 +269,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("age").lt(21) );
* }}}
+ *
+ * @group java_expr_ops
*/
def lt(other: Any): Column = this < other
@@ -268,6 +283,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("age").leq(21) );
* }}}
+ *
+ * @group expr_ops
*/
def <= (other: Any): Column = constructColumn(other) { o =>
LessThanOrEqual(expr, o.expr)
@@ -282,6 +299,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("age").leq(21) );
* }}}
+ *
+ * @group java_expr_ops
*/
def leq(other: Any): Column = this <= other
@@ -294,6 +313,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("age").geq(21) )
* }}}
+ *
+ * @group expr_ops
*/
def >= (other: Any): Column = constructColumn(other) { o =>
GreaterThanOrEqual(expr, o.expr)
@@ -308,11 +329,15 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("age").geq(21) )
* }}}
+ *
+ * @group java_expr_ops
*/
def geq(other: Any): Column = this >= other
/**
* Equality test that is safe for null values.
+ *
+ * @group expr_ops
*/
def <=> (other: Any): Column = constructColumn(other) { o =>
EqualNullSafe(expr, o.expr)
@@ -320,16 +345,22 @@ trait Column extends DataFrame {
/**
* Equality test that is safe for null values.
+ *
+ * @group java_expr_ops
*/
def eqNullSafe(other: Any): Column = this <=> other
/**
* True if the current expression is null.
+ *
+ * @group expr_ops
*/
def isNull: Column = exprToColumn(IsNull(expr))
/**
* True if the current expression is NOT null.
+ *
+ * @group expr_ops
*/
def isNotNull: Column = exprToColumn(IsNotNull(expr))
@@ -342,6 +373,8 @@ trait Column extends DataFrame {
* // Java:
* people.filter( people("inSchool").or(people("isEmployed")) );
* }}}
+ *
+ * @group expr_ops
*/
def || (other: Any): Column = constructColumn(other) { o =>
Or(expr, o.expr)
@@ -356,6 +389,8 @@ trait Column extends DataFrame {
* // Java:
* people.filter( people("inSchool").or(people("isEmployed")) );
* }}}
+ *
+ * @group java_expr_ops
*/
def or(other: Column): Column = this || other
@@ -368,6 +403,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("inSchool").and(people("isEmployed")) );
* }}}
+ *
+ * @group expr_ops
*/
def && (other: Any): Column = constructColumn(other) { o =>
And(expr, o.expr)
@@ -382,6 +419,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("inSchool").and(people("isEmployed")) );
* }}}
+ *
+ * @group java_expr_ops
*/
def and(other: Column): Column = this && other
@@ -394,6 +433,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("height").plus(people("weight")) );
* }}}
+ *
+ * @group expr_ops
*/
def + (other: Any): Column = constructColumn(other) { o =>
Add(expr, o.expr)
@@ -408,6 +449,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("height").plus(people("weight")) );
* }}}
+ *
+ * @group java_expr_ops
*/
def plus(other: Any): Column = this + other
@@ -420,6 +463,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("height").minus(people("weight")) );
* }}}
+ *
+ * @group expr_ops
*/
def - (other: Any): Column = constructColumn(other) { o =>
Subtract(expr, o.expr)
@@ -434,6 +479,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("height").minus(people("weight")) );
* }}}
+ *
+ * @group java_expr_ops
*/
def minus(other: Any): Column = this - other
@@ -446,6 +493,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("height").multiply(people("weight")) );
* }}}
+ *
+ * @group expr_ops
*/
def * (other: Any): Column = constructColumn(other) { o =>
Multiply(expr, o.expr)
@@ -460,6 +509,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("height").multiply(people("weight")) );
* }}}
+ *
+ * @group java_expr_ops
*/
def multiply(other: Any): Column = this * other
@@ -472,6 +523,8 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("height").divide(people("weight")) );
* }}}
+ *
+ * @group expr_ops
*/
def / (other: Any): Column = constructColumn(other) { o =>
Divide(expr, o.expr)
@@ -486,11 +539,15 @@ trait Column extends DataFrame {
* // Java:
* people.select( people("height").divide(people("weight")) );
* }}}
+ *
+ * @group java_expr_ops
*/
def divide(other: Any): Column = this / other
/**
* Modulo (a.k.a. remainder) expression.
+ *
+ * @group expr_ops
*/
def % (other: Any): Column = constructColumn(other) { o =>
Remainder(expr, o.expr)
@@ -498,29 +555,47 @@ trait Column extends DataFrame {
/**
* Modulo (a.k.a. remainder) expression.
+ *
+ * @group java_expr_ops
*/
def mod(other: Any): Column = this % other
/**
* A boolean expression that is evaluated to true if the value of this expression is contained
* by the evaluated values of the arguments.
+ *
+ * @group expr_ops
*/
@scala.annotation.varargs
def in(list: Column*): Column = {
new IncomputableColumn(In(expr, list.map(_.expr)))
}
+ /**
+ * SQL like expression.
+ *
+ * @group expr_ops
+ */
def like(literal: String): Column = exprToColumn(Like(expr, lit(literal).expr))
+ /**
+ * SQL RLIKE expression (LIKE with Regex).
+ *
+ * @group expr_ops
+ */
def rlike(literal: String): Column = exprToColumn(RLike(expr, lit(literal).expr))
/**
* An expression that gets an item at position `ordinal` out of an array.
+ *
+ * @group expr_ops
*/
def getItem(ordinal: Int): Column = exprToColumn(GetItem(expr, Literal(ordinal)))
/**
* An expression that gets a field by name in a [[StructField]].
+ *
+ * @group expr_ops
*/
def getField(fieldName: String): Column = exprToColumn(UnresolvedGetField(expr, fieldName))
@@ -528,6 +603,8 @@ trait Column extends DataFrame {
* An expression that returns a substring.
* @param startPos expression for the starting position.
* @param len expression for the length of the substring.
+ *
+ * @group expr_ops
*/
def substr(startPos: Column, len: Column): Column =
exprToColumn(Substring(expr, startPos.expr, len.expr), computable = false)
@@ -536,24 +613,51 @@ trait Column extends DataFrame {
* An expression that returns a substring.
* @param startPos starting position.
* @param len length of the substring.
+ *
+ * @group expr_ops
*/
def substr(startPos: Int, len: Int): Column =
exprToColumn(Substring(expr, lit(startPos).expr, lit(len).expr))
+ /**
+ * Contains the other element.
+ *
+ * @group expr_ops
+ */
def contains(other: Any): Column = constructColumn(other) { o =>
Contains(expr, o.expr)
}
+ /**
+ * String starts with.
+ *
+ * @group expr_ops
+ */
def startsWith(other: Column): Column = constructColumn(other) { o =>
StartsWith(expr, o.expr)
}
+ /**
+ * String starts with another string literal.
+ *
+ * @group expr_ops
+ */
def startsWith(literal: String): Column = this.startsWith(lit(literal))
+ /**
+ * String ends with.
+ *
+ * @group expr_ops
+ */
def endsWith(other: Column): Column = constructColumn(other) { o =>
EndsWith(expr, o.expr)
}
+ /**
+ * String ends with another string literal.
+ *
+ * @group expr_ops
+ */
def endsWith(literal: String): Column = this.endsWith(lit(literal))
/**
@@ -562,6 +666,8 @@ trait Column extends DataFrame {
* // Renames colA to colB in select output.
* df.select($"colA".as("colB"))
* }}}
+ *
+ * @group expr_ops
*/
override def as(alias: String): Column = exprToColumn(Alias(expr, alias)())
@@ -571,6 +677,8 @@ trait Column extends DataFrame {
* // Renames colA to colB in select output.
* df.select($"colA".as('colB))
* }}}
+ *
+ * @group expr_ops
*/
override def as(alias: Symbol): Column = exprToColumn(Alias(expr, alias.name)())
@@ -584,6 +692,8 @@ trait Column extends DataFrame {
* // equivalent to
* df.select(df("colA").cast("int"))
* }}}
+ *
+ * @group expr_ops
*/
def cast(to: DataType): Column = exprToColumn(Cast(expr, to))
@@ -595,6 +705,8 @@ trait Column extends DataFrame {
* // Casts colA to integer.
* df.select(df("colA").cast("int"))
* }}}
+ *
+ * @group expr_ops
*/
def cast(to: String): Column = exprToColumn(
Cast(expr, to.toLowerCase match {
@@ -613,10 +725,39 @@ trait Column extends DataFrame {
})
)
+ /**
+ * Returns an ordering used in sorting.
+ * {{{
+ * // Scala: sort a DataFrame by age column in descending order.
+ * df.sort(df("age").desc)
+ *
+ * // Java
+ * df.sort(df.col("age").desc());
+ * }}}
+ *
+ * @group expr_ops
+ */
def desc: Column = exprToColumn(SortOrder(expr, Descending), computable = false)
+ /**
+ * Returns an ordering used in sorting.
+ * {{{
+ * // Scala: sort a DataFrame by age column in ascending order.
+ * df.sort(df("age").asc)
+ *
+ * // Java
+ * df.sort(df.col("age").asc());
+ * }}}
+ *
+ * @group expr_ops
+ */
def asc: Column = exprToColumn(SortOrder(expr, Ascending), computable = false)
+ /**
+ * Prints the plans (logical and physical) to the console for debugging purpose.
+ *
+ * @group df_ops
+ */
override def explain(extended: Boolean): Unit = {
if (extended) {
println(expr)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index e21e989f36..c0c3cb40cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -41,19 +41,23 @@ private[sql] object DataFrame {
* :: Experimental ::
* A distributed collection of data organized into named columns.
*
- * A [[DataFrame]] is equivalent to a relational table in Spark SQL, and can be created using
- * various functions in [[SQLContext]].
+ * A [[DataFrame]] is equivalent to a relational table in Spark SQL. There are multiple ways
+ * to create a [[DataFrame]]:
* {{{
+ * // Create a DataFrame from Parquet files
* val people = sqlContext.parquetFile("...")
+ *
+ * // Create a DataFrame from data sources
+ * val df =
* }}}
*
* Once created, it can be manipulated using the various domain-specific-language (DSL) functions
- * defined in: [[DataFrame]] (this class), [[Column]], [[functions]] for the DSL.
+ * defined in: [[DataFrame]] (this class), [[Column]], and [[functions]].
*
- * To select a column from the data frame, use the apply method:
+ * To select a column from the data frame, use `apply` method in Scala and `col` in Java.
* {{{
* val ageCol = people("age") // in Scala
- * Column ageCol = people.apply("age") // in Java
+ * Column ageCol = people.col("age") // in Java
* }}}
*
* Note that the [[Column]] type can also be manipulated through its various functions.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 3c1cf8d5e3..848ea2e056 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -238,9 +238,10 @@ private[sql] class DataFrameImpl protected[sql](
}
override def withColumnRenamed(existingName: String, newName: String): DataFrame = {
+ val resolver = sqlContext.analyzer.resolver
val colNames = schema.map { field =>
val name = field.name
- if (name == existingName) Column(name).as(newName) else Column(name)
+ if (resolver(name, existingName)) Column(name).as(newName) else Column(name)
}
select(colNames :_*)
}
@@ -368,6 +369,7 @@ private[sql] class DataFrameImpl protected[sql](
/////////////////////////////////////////////////////////////////////////////
override def rdd: RDD[Row] = {
+ // use a local variable to make sure the map closure doesn't capture the whole DataFrame
val schema = this.schema
queryExecution.executedPlan.execute().map(ScalaReflection.convertRowToScala(_, schema))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index 2ecf086de9..17158303b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import scala.collection.JavaConversions._
import scala.language.implicitConversions
+import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
@@ -27,8 +28,10 @@ import org.apache.spark.sql.types.NumericType
/**
+ * :: Experimental ::
* A set of methods for aggregations on a [[DataFrame]], created by [[DataFrame.groupBy]].
*/
+@Experimental
class GroupedData protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression]) {
private[this] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index aa0fc3e359..0aae0942ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -25,7 +25,7 @@ import scala.collection.immutable
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
@@ -43,14 +43,13 @@ import org.apache.spark.util.Utils
import org.apache.spark.{Partition, SparkContext}
/**
- * :: AlphaComponent ::
* The entry point for running relational queries using Spark. Allows the creation of [[DataFrame]]
* objects and the execution of SQL queries.
*
+ * @groupname ddl_ops Catalog DDL functions
* @groupname userf Spark SQL Functions
* @groupname Ungrouped Support functions for language integrated queries.
*/
-@AlphaComponent
class SQLContext(@transient val sparkContext: SparkContext)
extends org.apache.spark.Logging
with Serializable {
@@ -127,13 +126,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] val cacheManager = new CacheManager(this)
/**
+ * :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
* the query planner for advanced functionalities.
*/
+ @Experimental
@transient
val experimental: ExperimentalMethods = new ExperimentalMethods(this)
- /** Returns a [[DataFrame]] with no rows or columns. */
+ /**
+ * :: Experimental ::
+ * Returns a [[DataFrame]] with no rows or columns.
+ */
+ @Experimental
@transient
lazy val emptyDataFrame = DataFrame(this, NoRelation)
@@ -178,9 +183,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/**
- * (Scala-specific)
- * Implicit methods available in Scala for converting common Scala objects into [[DataFrame]]s.
+ * :: Experimental ::
+ * (Scala-specific) Implicit methods available in Scala for converting
+ * common Scala objects into [[DataFrame]]s.
*/
+ @Experimental
object implicits extends Serializable {
// scalastyle:on
@@ -250,10 +257,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
+ * :: Experimental ::
* Creates a DataFrame from an RDD of case classes.
*
* @group userf
*/
+ @Experimental
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
SparkPlan.currentContext.set(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
@@ -263,8 +272,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
+ * :: Experimental ::
* Creates a DataFrame from a local Seq of Product.
*/
+ @Experimental
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
SparkPlan.currentContext.set(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
@@ -281,7 +292,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: DeveloperApi ::
- * Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
+ * Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s using the given schema.
* It is important to make sure that the structure of every [[Row]] of the provided RDD matches
* the provided schema. Otherwise, there will be runtime exception.
* Example:
@@ -316,6 +327,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
DataFrame(this, logicalPlan)
}
+ /**
+ * :: DeveloperApi ::
+ * Creates a [[DataFrame]] from an [[JavaRDD]] containing [[Row]]s using the given schema.
+ * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
+ * the provided schema. Otherwise, there will be runtime exception.
+ */
@DeveloperApi
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD.rdd, schema)
@@ -402,13 +419,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
- @DeveloperApi
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema)
}
- @DeveloperApi
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema)
@@ -437,18 +452,22 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
- * Loads a Parquet file, returning the result as a [[DataFrame]].
+ * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
+ * [[DataFrame]] if no paths are passed in.
*
* @group userf
*/
@scala.annotation.varargs
- def parquetFile(path: String, paths: String*): DataFrame =
- if (conf.parquetUseDataSourceApi) {
- baseRelationToDataFrame(parquet.ParquetRelation2(path +: paths, Map.empty)(this))
+ def parquetFile(paths: String*): DataFrame = {
+ if (paths.isEmpty) {
+ emptyDataFrame
+ } else if (conf.parquetUseDataSourceApi) {
+ baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this))
} else {
DataFrame(this, parquet.ParquetRelation(
- (path +: paths).mkString(","), Some(sparkContext.hadoopConfiguration), this))
+ paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
}
+ }
/**
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
@@ -545,8 +564,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
- * Returns the dataset stored at path as a DataFrame,
- * using the given data source.
+ * Returns the dataset stored at path as a DataFrame, using the given data source.
*/
@Experimental
def load(path: String, source: String): DataFrame = {
@@ -555,7 +573,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
- * Returns the dataset specified by the given data source and a set of options as a DataFrame.
+ * (Java-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame.
*/
@Experimental
def load(source: String, options: java.util.Map[String, String]): DataFrame = {
@@ -564,8 +583,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
- * (Scala-specific)
- * Returns the dataset specified by the given data source and a set of options as a DataFrame.
+ * (Scala-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame.
*/
@Experimental
def load(source: String, options: Map[String, String]): DataFrame = {
@@ -575,8 +594,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
- * Returns the dataset specified by the given data source and a set of options as a DataFrame,
- * using the given schema as the schema of the DataFrame.
+ * (Java-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*/
@Experimental
def load(
@@ -588,9 +607,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
- * (Scala-specific)
- * Returns the dataset specified by the given data source and a set of options as a DataFrame,
- * using the given schema as the schema of the DataFrame.
+ * (Scala-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*/
@Experimental
def load(
@@ -605,6 +623,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
+ *
+ * @group ddl_ops
*/
@Experimental
def createExternalTable(tableName: String, path: String): DataFrame = {
@@ -616,6 +636,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* Creates an external table from the given path based on a data source
* and returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
*/
@Experimental
def createExternalTable(
@@ -629,6 +651,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* Creates an external table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
*/
@Experimental
def createExternalTable(
@@ -643,6 +667,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* (Scala-specific)
* Creates an external table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
*/
@Experimental
def createExternalTable(
@@ -666,6 +692,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
*/
@Experimental
def createExternalTable(
@@ -681,6 +709,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* (Scala-specific)
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
+ *
+ * @group ddl_ops
*/
@Experimental
def createExternalTable(
@@ -734,26 +764,23 @@ class SQLContext(@transient val sparkContext: SparkContext)
* of the RDD.
*/
@Experimental
- def jdbcRDD(url: String, table: String, theParts: Array[String]):
- DataFrame = {
- val parts: Array[Partition] = theParts.zipWithIndex.map(
- x => JDBCPartition(x._1, x._2).asInstanceOf[Partition])
+ def jdbcRDD(url: String, table: String, theParts: Array[String]): DataFrame = {
+ val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) =>
+ JDBCPartition(part, i) : Partition
+ }
jdbcRDD(url, table, parts)
}
- private def jdbcRDD(url: String, table: String, parts: Array[Partition]):
- DataFrame = {
+ private def jdbcRDD(url: String, table: String, parts: Array[Partition]): DataFrame = {
val relation = JDBCRelation(url, table, parts)(this)
baseRelationToDataFrame(relation)
}
/**
- * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
+ * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
- *
- * @group userf
*/
- def registerRDDAsTable(rdd: DataFrame, tableName: String): Unit = {
+ private[sql] def registerRDDAsTable(rdd: DataFrame, tableName: String): Unit = {
catalog.registerTable(Seq(tableName), rdd.logicalPlan)
}
@@ -763,7 +790,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @param tableName the name of the table to be unregistered.
*
- * @group userf
+ * @group ddl_ops
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index d8b0a3b26d..8051df2992 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -32,9 +32,9 @@ import org.apache.spark.sql.types.DataType
/**
- * Functions for registering user-defined functions.
+ * Functions for registering user-defined functions. Use [[SQLContext.udf]] to access this.
*/
-class UDFRegistration(sqlContext: SQLContext) extends Logging {
+class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
private val functionRegistry = sqlContext.functionRegistry
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 4a0ec0b72c..2a1e086891 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}
+import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.expressions._
@@ -27,8 +28,15 @@ import org.apache.spark.sql.types._
/**
- * Domain specific functions available for [[DataFrame]].
+ * :: Experimental ::
+ * Functions available for [[DataFrame]].
+ *
+ * @groupname udf_funcs UDF functions
+ * @groupname agg_funcs Aggregate functions
+ * @groupname normal_funcs Non-aggregate functions
+ * @groupname Ungrouped Support functions for DataFrames.
*/
+@Experimental
// scalastyle:off
object functions {
// scalastyle:on
@@ -37,11 +45,15 @@ object functions {
/**
* Returns a [[Column]] based on the given column name.
+ *
+ * @group normal_funcs
*/
def col(colName: String): Column = Column(colName)
/**
* Returns a [[Column]] based on the given column name. Alias of [[col]].
+ *
+ * @group normal_funcs
*/
def column(colName: String): Column = Column(colName)
@@ -51,6 +63,8 @@ object functions {
* The passed in object is returned directly if it is already a [[Column]].
* If the object is a Scala Symbol, it is converted into a [[Column]] also.
* Otherwise, a new [[Column]] is created to represent the literal value.
+ *
+ * @group normal_funcs
*/
def lit(literal: Any): Column = {
literal match {
@@ -84,80 +98,168 @@ object functions {
//////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////
- /** Aggregate function: returns the sum of all values in the expression. */
+ /**
+ * Aggregate function: returns the sum of all values in the expression.
+ *
+ * @group agg_funcs
+ */
def sum(e: Column): Column = Sum(e.expr)
- /** Aggregate function: returns the sum of all values in the given column. */
+ /**
+ * Aggregate function: returns the sum of all values in the given column.
+ *
+ * @group agg_funcs
+ */
def sum(columnName: String): Column = sum(Column(columnName))
- /** Aggregate function: returns the sum of distinct values in the expression. */
+ /**
+ * Aggregate function: returns the sum of distinct values in the expression.
+ *
+ * @group agg_funcs
+ */
def sumDistinct(e: Column): Column = SumDistinct(e.expr)
- /** Aggregate function: returns the sum of distinct values in the expression. */
+ /**
+ * Aggregate function: returns the sum of distinct values in the expression.
+ *
+ * @group agg_funcs
+ */
def sumDistinct(columnName: String): Column = sumDistinct(Column(columnName))
- /** Aggregate function: returns the number of items in a group. */
+ /**
+ * Aggregate function: returns the number of items in a group.
+ *
+ * @group agg_funcs
+ */
def count(e: Column): Column = e.expr match {
// Turn count(*) into count(1)
case s: Star => Count(Literal(1))
case _ => Count(e.expr)
}
- /** Aggregate function: returns the number of items in a group. */
+ /**
+ * Aggregate function: returns the number of items in a group.
+ *
+ * @group agg_funcs
+ */
def count(columnName: String): Column = count(Column(columnName))
- /** Aggregate function: returns the number of distinct items in a group. */
+ /**
+ * Aggregate function: returns the number of distinct items in a group.
+ *
+ * @group agg_funcs
+ */
@scala.annotation.varargs
def countDistinct(expr: Column, exprs: Column*): Column =
CountDistinct((expr +: exprs).map(_.expr))
- /** Aggregate function: returns the number of distinct items in a group. */
+ /**
+ * Aggregate function: returns the number of distinct items in a group.
+ *
+ * @group agg_funcs
+ */
@scala.annotation.varargs
def countDistinct(columnName: String, columnNames: String*): Column =
countDistinct(Column(columnName), columnNames.map(Column.apply) :_*)
- /** Aggregate function: returns the approximate number of distinct items in a group. */
+ /**
+ * Aggregate function: returns the approximate number of distinct items in a group.
+ *
+ * @group agg_funcs
+ */
def approxCountDistinct(e: Column): Column = ApproxCountDistinct(e.expr)
- /** Aggregate function: returns the approximate number of distinct items in a group. */
+ /**
+ * Aggregate function: returns the approximate number of distinct items in a group.
+ *
+ * @group agg_funcs
+ */
def approxCountDistinct(columnName: String): Column = approxCountDistinct(column(columnName))
- /** Aggregate function: returns the approximate number of distinct items in a group. */
+ /**
+ * Aggregate function: returns the approximate number of distinct items in a group.
+ *
+ * @group agg_funcs
+ */
def approxCountDistinct(e: Column, rsd: Double): Column = ApproxCountDistinct(e.expr, rsd)
- /** Aggregate function: returns the approximate number of distinct items in a group. */
+ /**
+ * Aggregate function: returns the approximate number of distinct items in a group.
+ *
+ * @group agg_funcs
+ */
def approxCountDistinct(columnName: String, rsd: Double): Column = {
approxCountDistinct(Column(columnName), rsd)
}
- /** Aggregate function: returns the average of the values in a group. */
+ /**
+ * Aggregate function: returns the average of the values in a group.
+ *
+ * @group agg_funcs
+ */
def avg(e: Column): Column = Average(e.expr)
- /** Aggregate function: returns the average of the values in a group. */
+ /**
+ * Aggregate function: returns the average of the values in a group.
+ *
+ * @group agg_funcs
+ */
def avg(columnName: String): Column = avg(Column(columnName))
- /** Aggregate function: returns the first value in a group. */
+ /**
+ * Aggregate function: returns the first value in a group.
+ *
+ * @group agg_funcs
+ */
def first(e: Column): Column = First(e.expr)
- /** Aggregate function: returns the first value of a column in a group. */
+ /**
+ * Aggregate function: returns the first value of a column in a group.
+ *
+ * @group agg_funcs
+ */
def first(columnName: String): Column = first(Column(columnName))
- /** Aggregate function: returns the last value in a group. */
+ /**
+ * Aggregate function: returns the last value in a group.
+ *
+ * @group agg_funcs
+ */
def last(e: Column): Column = Last(e.expr)
- /** Aggregate function: returns the last value of the column in a group. */
+ /**
+ * Aggregate function: returns the last value of the column in a group.
+ *
+ * @group agg_funcs
+ */
def last(columnName: String): Column = last(Column(columnName))
- /** Aggregate function: returns the minimum value of the expression in a group. */
+ /**
+ * Aggregate function: returns the minimum value of the expression in a group.
+ *
+ * @group agg_funcs
+ */
def min(e: Column): Column = Min(e.expr)
- /** Aggregate function: returns the minimum value of the column in a group. */
+ /**
+ * Aggregate function: returns the minimum value of the column in a group.
+ *
+ * @group agg_funcs
+ */
def min(columnName: String): Column = min(Column(columnName))
- /** Aggregate function: returns the maximum value of the expression in a group. */
+ /**
+ * Aggregate function: returns the maximum value of the expression in a group.
+ *
+ * @group agg_funcs
+ */
def max(e: Column): Column = Max(e.expr)
- /** Aggregate function: returns the maximum value of the column in a group. */
+ /**
+ * Aggregate function: returns the maximum value of the column in a group.
+ *
+ * @group agg_funcs
+ */
def max(columnName: String): Column = max(Column(columnName))
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -168,6 +270,8 @@ object functions {
* {{{
* df.select(coalesce(df("a"), df("b")))
* }}}
+ *
+ * @group normal_funcs
*/
@scala.annotation.varargs
def coalesce(e: Column*): Column = Coalesce(e.map(_.expr))
@@ -182,6 +286,8 @@ object functions {
* // Java:
* df.select( negate(df.col("amount")) );
* }}}
+ *
+ * @group normal_funcs
*/
def negate(e: Column): Column = -e
@@ -194,19 +300,37 @@ object functions {
* // Java:
* df.filter( not(df.col("isActive")) );
* }}
+ *
+ * @group normal_funcs
*/
def not(e: Column): Column = !e
- /** Converts a string expression to upper case. */
+ /**
+ * Converts a string expression to upper case.
+ *
+ * @group normal_funcs
+ */
def upper(e: Column): Column = Upper(e.expr)
- /** Converts a string exprsesion to lower case. */
+ /**
+ * Converts a string exprsesion to lower case.
+ *
+ * @group normal_funcs
+ */
def lower(e: Column): Column = Lower(e.expr)
- /** Computes the square root of the specified float value. */
+ /**
+ * Computes the square root of the specified float value.
+ *
+ * @group normal_funcs
+ */
def sqrt(e: Column): Column = Sqrt(e.expr)
- /** Computes the absolutle value. */
+ /**
+ * Computes the absolutle value.
+ *
+ * @group normal_funcs
+ */
def abs(e: Column): Column = Abs(e.expr)
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -222,6 +346,8 @@ object functions {
/**
* Defines a user-defined function of ${x} arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -236,6 +362,8 @@ object functions {
/**
* Call a Scala function of ${x} arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function$x[$fTypes], returnType: DataType${if (args.length > 0) ", " + args else ""}): Column = {
ScalaUdf(f, returnType, Seq($argsInUdf))
@@ -246,6 +374,8 @@ object functions {
/**
* Defines a user-defined function of 0 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -254,6 +384,8 @@ object functions {
/**
* Defines a user-defined function of 1 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -262,6 +394,8 @@ object functions {
/**
* Defines a user-defined function of 2 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -270,6 +404,8 @@ object functions {
/**
* Defines a user-defined function of 3 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -278,6 +414,8 @@ object functions {
/**
* Defines a user-defined function of 4 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -286,6 +424,8 @@ object functions {
/**
* Defines a user-defined function of 5 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -294,6 +434,8 @@ object functions {
/**
* Defines a user-defined function of 6 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -302,6 +444,8 @@ object functions {
/**
* Defines a user-defined function of 7 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -310,6 +454,8 @@ object functions {
/**
* Defines a user-defined function of 8 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -318,6 +464,8 @@ object functions {
/**
* Defines a user-defined function of 9 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -326,6 +474,8 @@ object functions {
/**
* Defines a user-defined function of 10 arguments as user-defined function (UDF).
* The data types are automatically inferred based on the function's signature.
+ *
+ * @group udf_funcs
*/
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = {
UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType)
@@ -336,6 +486,8 @@ object functions {
/**
* Call a Scala function of 0 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function0[_], returnType: DataType): Column = {
ScalaUdf(f, returnType, Seq())
@@ -344,6 +496,8 @@ object functions {
/**
* Call a Scala function of 1 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function1[_, _], returnType: DataType, arg1: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr))
@@ -352,6 +506,8 @@ object functions {
/**
* Call a Scala function of 2 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function2[_, _, _], returnType: DataType, arg1: Column, arg2: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr))
@@ -360,6 +516,8 @@ object functions {
/**
* Call a Scala function of 3 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function3[_, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr))
@@ -368,6 +526,8 @@ object functions {
/**
* Call a Scala function of 4 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function4[_, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr))
@@ -376,6 +536,8 @@ object functions {
/**
* Call a Scala function of 5 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function5[_, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr))
@@ -384,6 +546,8 @@ object functions {
/**
* Call a Scala function of 6 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function6[_, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr))
@@ -392,6 +556,8 @@ object functions {
/**
* Call a Scala function of 7 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function7[_, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr))
@@ -400,6 +566,8 @@ object functions {
/**
* Call a Scala function of 8 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function8[_, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr))
@@ -408,6 +576,8 @@ object functions {
/**
* Call a Scala function of 9 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function9[_, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr))
@@ -416,6 +586,8 @@ object functions {
/**
* Call a Scala function of 10 arguments as user-defined function (UDF). This requires
* you to specify the return data type.
+ *
+ * @group udf_funcs
*/
def callUDF(f: Function10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType, arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column): Column = {
ScalaUdf(f, returnType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d2c39ab621..e63cea6045 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -143,7 +143,7 @@ private[hive] trait HiveStrategies {
PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
} else {
hiveContext
- .parquetFile(partitionLocations.head, partitionLocations.tail: _*)
+ .parquetFile(partitionLocations: _*)
.addPartitioningAttributes(relation.partitionKeys)
.lowerCase
.where(unresolvedOtherPredicates)
@@ -152,6 +152,7 @@ private[hive] trait HiveStrategies {
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
}
+
} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.toString)