aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-02-17 10:21:17 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-17 10:21:17 -0800
commitc74b07fa94a8da50437d952ae05cf6ac70fbb93e (patch)
tree63b906b18df550b9f8ccfe13dcff0c1d318cf546
parentc76da36c2163276b5c34e59fbb139eeb34ed0faa (diff)
downloadspark-c74b07fa94a8da50437d952ae05cf6ac70fbb93e.tar.gz
spark-c74b07fa94a8da50437d952ae05cf6ac70fbb93e.tar.bz2
spark-c74b07fa94a8da50437d952ae05cf6ac70fbb93e.zip
[SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation
Author: Michael Armbrust <michael@databricks.com> Closes #4642 from marmbrus/docs and squashes the following commits: d291c34 [Michael Armbrust] python tests 9be66e3 [Michael Armbrust] comments d56afc2 [Michael Armbrust] fix style f004747 [Michael Armbrust] fix build c4a907b [Michael Armbrust] fix tests 42e2b73 [Michael Armbrust] [SQL] Documentation / API Clean-up.
-rw-r--r--project/SparkBuild.scala12
-rw-r--r--python/pyspark/sql/context.py28
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java59
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala153
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala200
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/package.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala74
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala4
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java102
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala)19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala56
-rw-r--r--sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala9
-rw-r--r--sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala9
30 files changed, 483 insertions, 405 deletions
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 8fb1239b4a..e4b1b96527 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -361,9 +361,16 @@ object Unidoc {
publish := {},
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn),
+ inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, streamingFlumeSink, yarn),
+
+ // Skip actual catalyst, but include the subproject.
+ // Catalyst is not public API and contains quasiquotes which break scaladoc.
+ unidocAllSources in (ScalaUnidoc, unidoc) := {
+ (unidocAllSources in (ScalaUnidoc, unidoc)).value
+ .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
+ },
// Skip class names containing $ and some internal packages in Javadocs
unidocAllSources in (JavaUnidoc, unidoc) := {
@@ -376,6 +383,7 @@ object Unidoc {
.map(_.filterNot(_.getCanonicalPath.contains("executor")))
.map(_.filterNot(_.getCanonicalPath.contains("python")))
.map(_.filterNot(_.getCanonicalPath.contains("collection")))
+ .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
},
// Javadoc options: create a window title, and group key packages on index page
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index dd2cd5ee76..2e2309f103 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -252,7 +252,7 @@ class SQLContext(object):
>>> schema = StructType([StructField("field1", IntegerType(), False),
... StructField("field2", StringType(), False)])
>>> df = sqlCtx.applySchema(rdd2, schema)
- >>> sqlCtx.registerRDDAsTable(df, "table1")
+ >>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT * from table1")
>>> df2.collect()
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
@@ -405,17 +405,17 @@ class SQLContext(object):
return self.applySchema(data, schema)
- def registerRDDAsTable(self, rdd, tableName):
+ def registerDataFrameAsTable(self, rdd, tableName):
"""Registers the given RDD as a temporary table in the catalog.
Temporary tables exist only during the lifetime of this instance of
SQLContext.
- >>> sqlCtx.registerRDDAsTable(df, "table1")
+ >>> sqlCtx.registerDataFrameAsTable(df, "table1")
"""
if (rdd.__class__ is DataFrame):
df = rdd._jdf
- self._ssql_ctx.registerRDDAsTable(df, tableName)
+ self._ssql_ctx.registerDataFrameAsTable(df, tableName)
else:
raise ValueError("Can only register DataFrame as table")
@@ -456,7 +456,7 @@ class SQLContext(object):
... print>>ofn, json
>>> ofn.close()
>>> df1 = sqlCtx.jsonFile(jsonFile)
- >>> sqlCtx.registerRDDAsTable(df1, "table1")
+ >>> sqlCtx.registerDataFrameAsTable(df1, "table1")
>>> df2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table1")
@@ -467,7 +467,7 @@ class SQLContext(object):
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema)
- >>> sqlCtx.registerRDDAsTable(df3, "table2")
+ >>> sqlCtx.registerDataFrameAsTable(df3, "table2")
>>> df4 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table2")
@@ -485,7 +485,7 @@ class SQLContext(object):
... StructField("field5",
... ArrayType(IntegerType(), False), True)]), False)])
>>> df5 = sqlCtx.jsonFile(jsonFile, schema)
- >>> sqlCtx.registerRDDAsTable(df5, "table3")
+ >>> sqlCtx.registerDataFrameAsTable(df5, "table3")
>>> df6 = sqlCtx.sql(
... "SELECT field2 AS f1, field3.field5 as f2, "
... "field3.field5[0] as f3 from table3")
@@ -509,7 +509,7 @@ class SQLContext(object):
determine the schema.
>>> df1 = sqlCtx.jsonRDD(json)
- >>> sqlCtx.registerRDDAsTable(df1, "table1")
+ >>> sqlCtx.registerDataFrameAsTable(df1, "table1")
>>> df2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table1")
@@ -520,7 +520,7 @@ class SQLContext(object):
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> df3 = sqlCtx.jsonRDD(json, df1.schema)
- >>> sqlCtx.registerRDDAsTable(df3, "table2")
+ >>> sqlCtx.registerDataFrameAsTable(df3, "table2")
>>> df4 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table2")
@@ -538,7 +538,7 @@ class SQLContext(object):
... StructField("field5",
... ArrayType(IntegerType(), False), True)]), False)])
>>> df5 = sqlCtx.jsonRDD(json, schema)
- >>> sqlCtx.registerRDDAsTable(df5, "table3")
+ >>> sqlCtx.registerDataFrameAsTable(df5, "table3")
>>> df6 = sqlCtx.sql(
... "SELECT field2 AS f1, field3.field5 as f2, "
... "field3.field5[0] as f3 from table3")
@@ -628,7 +628,7 @@ class SQLContext(object):
def sql(self, sqlQuery):
"""Return a L{DataFrame} representing the result of the given query.
- >>> sqlCtx.registerRDDAsTable(df, "table1")
+ >>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
@@ -638,7 +638,7 @@ class SQLContext(object):
def table(self, tableName):
"""Returns the specified table as a L{DataFrame}.
- >>> sqlCtx.registerRDDAsTable(df, "table1")
+ >>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True
@@ -653,7 +653,7 @@ class SQLContext(object):
The returned DataFrame has two columns, tableName and isTemporary
(a column with BooleanType indicating if a table is a temporary one or not).
- >>> sqlCtx.registerRDDAsTable(df, "table1")
+ >>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.tables()
>>> df2.filter("tableName = 'table1'").first()
Row(tableName=u'table1', isTemporary=True)
@@ -668,7 +668,7 @@ class SQLContext(object):
If `dbName` is not specified, the current database will be used.
- >>> sqlCtx.registerRDDAsTable(df, "table1")
+ >>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlCtx.tableNames()
True
>>> "table1" in sqlCtx.tableNames("db")
diff --git a/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java b/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java
deleted file mode 100644
index aa441b2096..0000000000
--- a/sql/core/src/main/java/org/apache/spark/sql/jdbc/JDBCUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.jdbc;
-
-import org.apache.spark.Partition;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.DataFrame;
-
-public class JDBCUtils {
- /**
- * Construct a DataFrame representing the JDBC table at the database
- * specified by url with table name table.
- */
- public static DataFrame jdbcRDD(SQLContext sql, String url, String table) {
- Partition[] parts = new Partition[1];
- parts[0] = new JDBCPartition(null, 0);
- return sql.baseRelationToDataFrame(
- new JDBCRelation(url, table, parts, sql));
- }
-
- /**
- * Construct a DataFrame representing the JDBC table at the database
- * specified by url with table name table partitioned by parts.
- * Here, parts is an array of expressions suitable for insertion into a WHERE
- * clause; each one defines one partition.
- */
- public static DataFrame jdbcRDD(SQLContext sql, String url, String table, String[] parts) {
- Partition[] partitions = new Partition[parts.length];
- for (int i = 0; i < parts.length; i++)
- partitions[i] = new JDBCPartition(parts[i], i);
- return sql.baseRelationToDataFrame(
- new JDBCRelation(url, table, partitions, sql));
- }
-
- private static JavaJDBCTrampoline trampoline = new JavaJDBCTrampoline();
-
- public static void createJDBCTable(DataFrame rdd, String url, String table, boolean allowExisting) {
- trampoline.createJDBCTable(rdd, url, table, allowExisting);
- }
-
- public static void insertIntoJDBC(DataFrame rdd, String url, String table, boolean overwrite) {
- trampoline.insertIntoJDBC(rdd, url, table, overwrite);
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index c0c3cb40cf..fa5fe84263 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql
+import java.sql.DriverManager
+
+
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
@@ -27,6 +30,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.jdbc.JDBCWriteDetails
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -77,6 +81,12 @@ private[sql] object DataFrame {
* .groupBy(department("name"), "gender")
* .agg(avg(people("salary")), max(people("age")))
* }}}
+ *
+ * @groupname basic Basic DataFrame functions
+ * @groupname dfops Language Integrated Queries
+ * @groupname rdd RDD Operations
+ * @groupname output Output Operations
+ * @groupname action Actions
*/
// TODO: Improve documentation.
@Experimental
@@ -102,7 +112,8 @@ trait DataFrame extends RDDApi[Row] with Serializable {
def toSchemaRDD: DataFrame = this
/**
- * Returns the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala.
+ * Returns the object itself.
+ * @group basic
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
@@ -116,31 +127,51 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* rdd.toDF // this implicit conversion creates a DataFrame with column name _1 and _2
* rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
* }}}
+ * @group basic
*/
@scala.annotation.varargs
def toDF(colNames: String*): DataFrame
- /** Returns the schema of this [[DataFrame]]. */
+ /**
+ * Returns the schema of this [[DataFrame]].
+ * @group basic
+ */
def schema: StructType
- /** Returns all column names and their data types as an array. */
+ /**
+ * Returns all column names and their data types as an array.
+ * @group basic
+ */
def dtypes: Array[(String, String)]
- /** Returns all column names as an array. */
+ /**
+ * Returns all column names as an array.
+ * @group basic
+ */
def columns: Array[String] = schema.fields.map(_.name)
- /** Prints the schema to the console in a nice tree format. */
+ /**
+ * Prints the schema to the console in a nice tree format.
+ * @group basic
+ */
def printSchema(): Unit
- /** Prints the plans (logical and physical) to the console for debugging purpose. */
+ /**
+ * Prints the plans (logical and physical) to the console for debugging purpose.
+ * @group basic
+ */
def explain(extended: Boolean): Unit
- /** Only prints the physical plan to the console for debugging purpose. */
+ /**
+ * Only prints the physical plan to the console for debugging purpose.
+ * @group basic
+ */
def explain(): Unit = explain(extended = false)
/**
* Returns true if the `collect` and `take` methods can be run locally
* (without any Spark executors).
+ * @group basic
*/
def isLocal: Boolean
@@ -154,6 +185,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* 1983 03 0.410516 0.442194
* 1984 04 0.450090 0.483521
* }}}
+ * @group basic
*/
def show(): Unit
@@ -163,6 +195,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* Note that cartesian joins are very expensive without an extra filter that can be pushed down.
*
* @param right Right side of the join operation.
+ * @group dfops
*/
def join(right: DataFrame): DataFrame
@@ -174,6 +207,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df1.join(df2, $"df1Key" === $"df2Key")
* df1.join(df2).where($"df1Key" === $"df2Key")
* }}}
+ * @group dfops
*/
def join(right: DataFrame, joinExprs: Column): DataFrame
@@ -194,6 +228,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* @param right Right side of the join.
* @param joinExprs Join expression.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.
+ * @group dfops
*/
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
@@ -205,6 +240,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.sort($"sortcol")
* df.sort($"sortcol".asc)
* }}}
+ * @group dfops
*/
@scala.annotation.varargs
def sort(sortCol: String, sortCols: String*): DataFrame
@@ -214,6 +250,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* df.sort($"col1", $"col2".desc)
* }}}
+ * @group dfops
*/
@scala.annotation.varargs
def sort(sortExprs: Column*): DataFrame
@@ -221,6 +258,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new [[DataFrame]] sorted by the given expressions.
* This is an alias of the `sort` function.
+ * @group dfops
*/
@scala.annotation.varargs
def orderBy(sortCol: String, sortCols: String*): DataFrame
@@ -228,27 +266,32 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new [[DataFrame]] sorted by the given expressions.
* This is an alias of the `sort` function.
+ * @group dfops
*/
@scala.annotation.varargs
def orderBy(sortExprs: Column*): DataFrame
/**
* Selects column based on the column name and return it as a [[Column]].
+ * @group dfops
*/
def apply(colName: String): Column = col(colName)
/**
* Selects column based on the column name and return it as a [[Column]].
+ * @group dfops
*/
def col(colName: String): Column
/**
* Returns a new [[DataFrame]] with an alias set.
+ * @group dfops
*/
def as(alias: String): DataFrame
/**
* (Scala-specific) Returns a new [[DataFrame]] with an alias set.
+ * @group dfops
*/
def as(alias: Symbol): DataFrame
@@ -257,6 +300,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* df.select($"colA", $"colB" + 1)
* }}}
+ * @group dfops
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame
@@ -270,6 +314,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.select("colA", "colB")
* df.select($"colA", $"colB")
* }}}
+ * @group dfops
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame
@@ -281,6 +326,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* df.selectExpr("colA", "colB as newName", "abs(colC)")
* }}}
+ * @group dfops
*/
@scala.annotation.varargs
def selectExpr(exprs: String*): DataFrame
@@ -293,6 +339,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* peopleDf.where($"age" > 15)
* peopleDf($"age" > 15)
* }}}
+ * @group dfops
*/
def filter(condition: Column): DataFrame
@@ -301,6 +348,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* peopleDf.filter("age > 15")
* }}}
+ * @group dfops
*/
def filter(conditionExpr: String): DataFrame
@@ -312,6 +360,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* peopleDf.where($"age" > 15)
* peopleDf($"age" > 15)
* }}}
+ * @group dfops
*/
def where(condition: Column): DataFrame
@@ -329,6 +378,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* "age" -> "max"
* ))
* }}}
+ * @group dfops
*/
@scala.annotation.varargs
def groupBy(cols: Column*): GroupedData
@@ -350,6 +400,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* "age" -> "max"
* ))
* }}}
+ * @group dfops
*/
@scala.annotation.varargs
def groupBy(col1: String, cols: String*): GroupedData
@@ -366,6 +417,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* "expense" -> "sum"
* )
* }}}
+ * @group dfops
*/
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
groupBy().agg(aggExpr, aggExprs :_*)
@@ -378,6 +430,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.agg(Map("age" -> "max", "salary" -> "avg"))
* df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}
+ * @group dfops
*/
def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
@@ -388,6 +441,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.agg(Map("age" -> "max", "salary" -> "avg"))
* df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}
+ * @group dfops
*/
def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs)
@@ -398,6 +452,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.agg(max($"age"), avg($"salary"))
* df.groupBy().agg(max($"age"), avg($"salary"))
* }}
+ * @group dfops
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs :_*)
@@ -405,24 +460,28 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
* and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
+ * @group dfops
*/
def limit(n: Int): DataFrame
/**
* Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
+ * @group dfops
*/
def unionAll(other: DataFrame): DataFrame
/**
* Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
* This is equivalent to `INTERSECT` in SQL.
+ * @group dfops
*/
def intersect(other: DataFrame): DataFrame
/**
* Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
* This is equivalent to `EXCEPT` in SQL.
+ * @group dfops
*/
def except(other: DataFrame): DataFrame
@@ -432,6 +491,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* @param withReplacement Sample with replacement or not.
* @param fraction Fraction of rows to generate.
* @param seed Seed for sampling.
+ * @group dfops
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame
@@ -440,6 +500,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
*
* @param withReplacement Sample with replacement or not.
* @param fraction Fraction of rows to generate.
+ * @group dfops
*/
def sample(withReplacement: Boolean, fraction: Double): DataFrame = {
sample(withReplacement, fraction, Utils.random.nextLong)
@@ -464,6 +525,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
*
* val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))
* }}}
+ * @group dfops
*/
def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame
@@ -476,6 +538,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* df.explode("words", "word")(words: String => words.split(" "))
* }}}
+ * @group dfops
*/
def explode[A, B : TypeTag](
inputColumn: String,
@@ -486,11 +549,13 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new [[DataFrame]] by adding a column.
+ * @group dfops
*/
def withColumn(colName: String, col: Column): DataFrame
/**
* Returns a new [[DataFrame]] with a column renamed.
+ * @group dfops
*/
def withColumnRenamed(existingName: String, newName: String): DataFrame
@@ -511,62 +576,84 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new RDD by applying a function to all rows of this DataFrame.
+ * @group rdd
*/
override def map[R: ClassTag](f: Row => R): RDD[R]
/**
* Returns a new RDD by first applying a function to all rows of this [[DataFrame]],
* and then flattening the results.
+ * @group rdd
*/
override def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R]
/**
* Returns a new RDD by applying a function to each partition of this DataFrame.
+ * @group rdd
*/
override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R]
/**
* Applies a function `f` to all rows.
+ * @group rdd
*/
override def foreach(f: Row => Unit): Unit
/**
* Applies a function f to each partition of this [[DataFrame]].
+ * @group rdd
*/
override def foreachPartition(f: Iterator[Row] => Unit): Unit
/**
* Returns the first `n` rows in the [[DataFrame]].
+ * @group action
*/
override def take(n: Int): Array[Row]
/**
* Returns an array that contains all of [[Row]]s in this [[DataFrame]].
+ * @group action
*/
override def collect(): Array[Row]
/**
* Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
+ * @group action
*/
override def collectAsList(): java.util.List[Row]
/**
* Returns the number of rows in the [[DataFrame]].
+ * @group action
*/
override def count(): Long
/**
* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
+ * @group rdd
*/
override def repartition(numPartitions: Int): DataFrame
- /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */
+ /**
+ * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
+ * @group dfops
+ */
override def distinct: DataFrame
+ /**
+ * @group basic
+ */
override def persist(): this.type
+ /**
+ * @group basic
+ */
override def persist(newLevel: StorageLevel): this.type
+ /**
+ * @group basic
+ */
override def unpersist(blocking: Boolean): this.type
/////////////////////////////////////////////////////////////////////////////
@@ -575,16 +662,19 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s.
+ * @group rdd
*/
def rdd: RDD[Row]
/**
* Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+ * @group rdd
*/
def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD()
/**
* Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+ * @group rdd
*/
def javaRDD: JavaRDD[Row] = toJavaRDD
@@ -592,7 +682,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* Registers this RDD as a temporary table using the given name. The lifetime of this temporary
* table is tied to the [[SQLContext]] that was used to create this DataFrame.
*
- * @group schema
+ * @group basic
*/
def registerTempTable(tableName: String): Unit
@@ -600,6 +690,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
* Files that are written out using this method can be read back in as a [[DataFrame]]
* using the `parquetFile` function in [[SQLContext]].
+ * @group output
*/
def saveAsParquetFile(path: String): Unit
@@ -613,6 +704,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
+ * @group output
*/
@Experimental
def saveAsTable(tableName: String): Unit = {
@@ -628,6 +720,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
+ * @group output
*/
@Experimental
def saveAsTable(tableName: String, mode: SaveMode): Unit = {
@@ -651,6 +744,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
+ * @group output
*/
@Experimental
def saveAsTable(
@@ -668,6 +762,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
+ * @group output
*/
@Experimental
def saveAsTable(
@@ -686,6 +781,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
+ * @group output
*/
@Experimental
def saveAsTable(
@@ -706,6 +802,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
+ * @group output
*/
@Experimental
def saveAsTable(
@@ -719,6 +816,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* Saves the contents of this DataFrame to the given path,
* using the default data source configured by spark.sql.sources.default and
* [[SaveMode.ErrorIfExists]] as the save mode.
+ * @group output
*/
@Experimental
def save(path: String): Unit = {
@@ -729,6 +827,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
* using the default data source configured by spark.sql.sources.default.
+ * @group output
*/
@Experimental
def save(path: String, mode: SaveMode): Unit = {
@@ -740,6 +839,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* using [[SaveMode.ErrorIfExists]] as the save mode.
+ * @group output
*/
@Experimental
def save(path: String, source: String): Unit = {
@@ -750,6 +850,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source and
* [[SaveMode]] specified by mode.
+ * @group output
*/
@Experimental
def save(path: String, source: String, mode: SaveMode): Unit = {
@@ -760,6 +861,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options.
+ * @group output
*/
@Experimental
def save(
@@ -774,6 +876,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* (Scala-specific)
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options
+ * @group output
*/
@Experimental
def save(
@@ -784,6 +887,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+ * @group output
*/
@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit
@@ -792,16 +896,47 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Adds the rows from this RDD to the specified table.
* Throws an exception if the table already exists.
+ * @group output
*/
@Experimental
def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
/**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
+ * @group rdd
*/
def toJSON: RDD[String]
////////////////////////////////////////////////////////////////////////////
+ // JDBC Write Support
+ ////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Save this RDD to a JDBC database at `url` under the table name `table`.
+ * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
+ * If you pass `true` for `allowExisting`, it will drop any table with the
+ * given name; if you pass `false`, it will throw if the table already
+ * exists.
+ * @group output
+ */
+ def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit
+
+ /**
+ * Save this RDD to a JDBC database at `url` under the table name `table`.
+ * Assumes the table already exists and has a compatible schema. If you
+ * pass `true` for `overwrite`, it will `TRUNCATE` the table before
+ * performing the `INSERT`s.
+ *
+ * The table must already exist on the database. It must have a schema
+ * that is compatible with the schema of this RDD; inserting the rows of
+ * the RDD in order via the simple statement
+ * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
+ * @group output
+ */
+ def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit
+
+
+ ////////////////////////////////////////////////////////////////////////////
// for Python API
////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 848ea2e056..25bc9d9292 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.io.CharArrayWriter
+import java.sql.DriverManager
import scala.language.implicitConversions
import scala.reflect.ClassTag
@@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython}
+import org.apache.spark.sql.jdbc.JDBCWriteDetails
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{NumericType, StructType}
@@ -375,7 +377,7 @@ private[sql] class DataFrameImpl protected[sql](
}
override def registerTempTable(tableName: String): Unit = {
- sqlContext.registerRDDAsTable(this, tableName)
+ sqlContext.registerDataFrameAsTable(this, tableName)
}
override def saveAsParquetFile(path: String): Unit = {
@@ -441,6 +443,35 @@ private[sql] class DataFrameImpl protected[sql](
}
}
+ def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
+ val conn = DriverManager.getConnection(url)
+ try {
+ if (allowExisting) {
+ val sql = s"DROP TABLE IF EXISTS $table"
+ conn.prepareStatement(sql).executeUpdate()
+ }
+ val schema = JDBCWriteDetails.schemaString(this, url)
+ val sql = s"CREATE TABLE $table ($schema)"
+ conn.prepareStatement(sql).executeUpdate()
+ } finally {
+ conn.close()
+ }
+ JDBCWriteDetails.saveTable(this, url, table)
+ }
+
+ def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
+ if (overwrite) {
+ val conn = DriverManager.getConnection(url)
+ try {
+ val sql = s"TRUNCATE TABLE $table"
+ conn.prepareStatement(sql).executeUpdate()
+ } finally {
+ conn.close()
+ }
+ }
+ JDBCWriteDetails.saveTable(this, url, table)
+ }
+
////////////////////////////////////////////////////////////////////////////
// for Python API
////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
index f0e6a8f332..d5d7e35a6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
@@ -20,8 +20,13 @@ package org.apache.spark.sql
import org.apache.spark.annotation.Experimental
/**
+ * :: Experimental ::
* Holder for experimental methods for the bravest. We make NO guarantee about the stability
* regarding binary compatibility and source compatibility of methods here.
+ *
+ * {{{
+ * sqlContext.experimental.extraStrategies += ...
+ * }}}
*/
@Experimental
class ExperimentalMethods protected[sql](sqlContext: SQLContext) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index fc37cfa7a8..b48b682b36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -173,6 +173,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def insertInto(tableName: String, overwrite: Boolean): Unit = err()
+ def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = err()
+
+ def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = err()
+
override def toJSON: RDD[String] = err()
protected[sql] override def javaToPython: JavaRDD[Array[Byte]] = err()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0aae0942ca..31afa0eb59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -43,11 +43,16 @@ import org.apache.spark.util.Utils
import org.apache.spark.{Partition, SparkContext}
/**
- * The entry point for running relational queries using Spark. Allows the creation of [[DataFrame]]
- * objects and the execution of SQL queries.
+ * The entry point for working with structured data (rows and columns) in Spark. Allows the
+ * creation of [[DataFrame]] objects as well as the execution of SQL queries.
*
- * @groupname ddl_ops Catalog DDL functions
- * @groupname userf Spark SQL Functions
+ * @groupname basic Basic Operations
+ * @groupname ddl_ops Persistent Catalog DDL
+ * @groupname cachemgmt Cached Table Management
+ * @groupname genericdata Generic Data Sources
+ * @groupname specificdata Specific Data Sources
+ * @groupname config Configuration
+ * @groupname dataframes Custom DataFrame Creation
* @groupname Ungrouped Support functions for language integrated queries.
*/
class SQLContext(@transient val sparkContext: SparkContext)
@@ -61,24 +66,40 @@ class SQLContext(@transient val sparkContext: SparkContext)
// Note that this is a lazy val so we can override the default value in subclasses.
protected[sql] lazy val conf: SQLConf = new SQLConf
- /** Set Spark SQL configuration properties. */
+ /**
+ * Set Spark SQL configuration properties.
+ *
+ * @group config
+ */
def setConf(props: Properties): Unit = conf.setConf(props)
- /** Set the given Spark SQL configuration property. */
+ /**
+ * Set the given Spark SQL configuration property.
+ *
+ * @group config
+ */
def setConf(key: String, value: String): Unit = conf.setConf(key, value)
- /** Return the value of Spark SQL configuration property for the given key. */
+ /**
+ * Return the value of Spark SQL configuration property for the given key.
+ *
+ * @group config
+ */
def getConf(key: String): String = conf.getConf(key)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
* yet, return `defaultValue`.
+ *
+ * @group config
*/
def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
/**
* Return all the configuration properties that have been set (i.e. not the default).
* This creates a new copy of the config properties in the form of a Map.
+ *
+ * @group config
*/
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
@@ -128,7 +149,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
- * the query planner for advanced functionalities.
+ * the query planner for advanced functionality.
+ *
+ * @group basic
*/
@Experimental
@transient
@@ -137,6 +160,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* Returns a [[DataFrame]] with no rows or columns.
+ *
+ * @group basic
*/
@Experimental
@transient
@@ -167,17 +192,28 @@ class SQLContext(@transient val sparkContext: SparkContext)
* (Integer arg1, String arg2) -> arg2 + arg1),
* DataTypes.StringType);
* }}}
+ *
+ * @group basic
*/
@transient
val udf: UDFRegistration = new UDFRegistration(this)
- /** Returns true if the table is currently cached in-memory. */
+ /**
+ * Returns true if the table is currently cached in-memory.
+ * @group cachemgmt
+ */
def isCached(tableName: String): Boolean = cacheManager.isCached(tableName)
- /** Caches the specified table in-memory. */
+ /**
+ * Caches the specified table in-memory.
+ * @group cachemgmt
+ */
def cacheTable(tableName: String): Unit = cacheManager.cacheTable(tableName)
- /** Removes the specified table from the in-memory cache. */
+ /**
+ * Removes the specified table from the in-memory cache.
+ * @group cachemgmt
+ */
def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
// scalastyle:off
@@ -186,6 +222,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Scala-specific) Implicit methods available in Scala for converting
* common Scala objects into [[DataFrame]]s.
+ *
+ * {{{
+ * val sqlContext = new SQLContext
+ * import sqlContext._
+ * }}}
+ *
+ * @group basic
*/
@Experimental
object implicits extends Serializable {
@@ -260,7 +303,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* Creates a DataFrame from an RDD of case classes.
*
- * @group userf
+ * @group dataframes
*/
@Experimental
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
@@ -274,6 +317,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* Creates a DataFrame from a local Seq of Product.
+ *
+ * @group dataframes
*/
@Experimental
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
@@ -285,6 +330,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Convert a [[BaseRelation]] created for external data sources into a [[DataFrame]].
+ *
+ * @group dataframes
*/
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
DataFrame(this, LogicalRelation(baseRelation))
@@ -318,6 +365,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* dataFrame.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
+ *
+ * @group dataframes
*/
@DeveloperApi
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
@@ -332,6 +381,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Creates a [[DataFrame]] from an [[JavaRDD]] containing [[Row]]s using the given schema.
* It is important to make sure that the structure of every [[Row]] of the provided RDD matches
* the provided schema. Otherwise, there will be runtime exception.
+ *
+ * @group dataframes
*/
@DeveloperApi
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
@@ -346,6 +397,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @param rowRDD an JavaRDD of Row
* @param columns names for each column
* @return DataFrame
+ * @group dataframes
*/
def createDataFrame(rowRDD: JavaRDD[Row], columns: java.util.List[String]): DataFrame = {
createDataFrame(rowRDD.rdd, columns.toSeq)
@@ -356,6 +408,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
+ * @group dataframes
*/
def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
val attributeSeq = getSchema(beanClass)
@@ -383,6 +436,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
+ * @group dataframes
*/
def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd.rdd, beanClass)
@@ -416,8 +470,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
* dataFrame.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
- *
- * @group userf
*/
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
@@ -455,7 +507,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
* [[DataFrame]] if no paths are passed in.
*
- * @group userf
+ * @group specificdata
*/
@scala.annotation.varargs
def parquetFile(paths: String*): DataFrame = {
@@ -473,7 +525,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
- * @group userf
+ * @group specificdata
*/
def jsonFile(path: String): DataFrame = jsonFile(path, 1.0)
@@ -482,7 +534,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Loads a JSON file (one object per line) and applies the given schema,
* returning the result as a [[DataFrame]].
*
- * @group userf
+ * @group specificdata
*/
@Experimental
def jsonFile(path: String, schema: StructType): DataFrame = {
@@ -492,6 +544,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
+ * @group specificdata
*/
@Experimental
def jsonFile(path: String, samplingRatio: Double): DataFrame = {
@@ -504,10 +557,18 @@ class SQLContext(@transient val sparkContext: SparkContext)
* [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
- * @group userf
+ * @group specificdata
*/
def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+ * [[DataFrame]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group specificdata
+ */
def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0)
/**
@@ -515,7 +576,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
* returning the result as a [[DataFrame]].
*
- * @group userf
+ * @group specificdata
*/
@Experimental
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
@@ -528,6 +589,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
createDataFrame(rowRDD, appliedSchema)
}
+ /**
+ * :: Experimental ::
+ * Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
+ * schema, returning the result as a [[DataFrame]].
+ *
+ * @group specificdata
+ */
@Experimental
def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
jsonRDD(json.rdd, schema)
@@ -535,6 +603,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
+ * Loads an RDD[String] storing JSON objects (one object per record) inferring the
+ * schema, returning the result as a [[DataFrame]].
+ *
+ * @group specificdata
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
@@ -546,6 +618,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
createDataFrame(rowRDD, appliedSchema)
}
+ /**
+ * :: Experimental ::
+ * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
+ * schema, returning the result as a [[DataFrame]].
+ *
+ * @group specificdata
+ */
@Experimental
def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
jsonRDD(json.rdd, samplingRatio);
@@ -555,6 +634,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* Returns the dataset stored at path as a DataFrame,
* using the default data source configured by spark.sql.sources.default.
+ *
+ * @group genericdata
*/
@Experimental
def load(path: String): DataFrame = {
@@ -565,6 +646,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* Returns the dataset stored at path as a DataFrame, using the given data source.
+ *
+ * @group genericdata
*/
@Experimental
def load(path: String, source: String): DataFrame = {
@@ -575,6 +658,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
+ *
+ * @group genericdata
*/
@Experimental
def load(source: String, options: java.util.Map[String, String]): DataFrame = {
@@ -585,6 +670,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
+ *
+ * @group genericdata
*/
@Experimental
def load(source: String, options: Map[String, String]): DataFrame = {
@@ -596,6 +683,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
+ *
+ * @group genericdata
*/
@Experimental
def load(
@@ -609,6 +698,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
+ * @group genericdata
*/
@Experimental
def load(
@@ -733,54 +823,70 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
- * Construct an RDD representing the database table accessible via JDBC URL
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table.
+ *
+ * @group specificdata
*/
@Experimental
- def jdbcRDD(url: String, table: String): DataFrame = {
- jdbcRDD(url, table, null.asInstanceOf[JDBCPartitioningInfo])
+ def jdbc(url: String, table: String): DataFrame = {
+ jdbc(url, table, JDBCRelation.columnPartition(null))
}
/**
* :: Experimental ::
- * Construct an RDD representing the database table accessible via JDBC URL
- * url named table. The PartitioningInfo parameter
- * gives the name of a column of integral type, a number of partitions, and
- * advisory minimum and maximum values for the column. The RDD is
- * partitioned according to said column.
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+ * url named table. Partitions of the table will be retrieved in parallel based on the parameters
+ * passed to this function.
+ *
+ * @param columnName the name of a column of integral type that will be used for partitioning.
+ * @param lowerBound the minimum value of `columnName` to retrieve
+ * @param upperBound the maximum value of `columnName` to retrieve
+ * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
+ * evenly into this many partitions
+ *
+ * @group specificdata
*/
@Experimental
- def jdbcRDD(url: String, table: String, partitioning: JDBCPartitioningInfo):
- DataFrame = {
+ def jdbc(
+ url: String,
+ table: String,
+ columnName: String,
+ lowerBound: Long,
+ upperBound: Long,
+ numPartitions: Int): DataFrame = {
+ val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
val parts = JDBCRelation.columnPartition(partitioning)
- jdbcRDD(url, table, parts)
+ jdbc(url, table, parts)
}
/**
* :: Experimental ::
- * Construct an RDD representing the database table accessible via JDBC URL
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. The theParts parameter gives a list expressions
* suitable for inclusion in WHERE clauses; each one defines one partition
- * of the RDD.
+ * of the [[DataFrame]].
+ *
+ * @group specificdata
*/
@Experimental
- def jdbcRDD(url: String, table: String, theParts: Array[String]): DataFrame = {
+ def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
- jdbcRDD(url, table, parts)
+ jdbc(url, table, parts)
}
- private def jdbcRDD(url: String, table: String, parts: Array[Partition]): DataFrame = {
+ private def jdbc(url: String, table: String, parts: Array[Partition]): DataFrame = {
val relation = JDBCRelation(url, table, parts)(this)
baseRelationToDataFrame(relation)
}
/**
- * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
- * during the lifetime of this instance of SQLContext.
+ * Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist
+ * only during the lifetime of this instance of SQLContext.
*/
- private[sql] def registerRDDAsTable(rdd: DataFrame, tableName: String): Unit = {
+ private[sql] def registerDataFrameAsTable(rdd: DataFrame, tableName: String): Unit = {
catalog.registerTable(Seq(tableName), rdd.logicalPlan)
}
@@ -790,7 +896,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @param tableName the name of the table to be unregistered.
*
- * @group ddl_ops
+ * @group basic
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
@@ -801,7 +907,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Executes a SQL query using Spark, returning the result as a [[DataFrame]]. The dialect that is
* used for SQL parsing can be configured with 'spark.sql.dialect'.
*
- * @group userf
+ * @group basic
*/
def sql(sqlText: String): DataFrame = {
if (conf.dialect == "sql") {
@@ -811,7 +917,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
}
- /** Returns the specified table as a [[DataFrame]]. */
+ /**
+ * Returns the specified table as a [[DataFrame]].
+ *
+ * @group ddl_ops
+ */
def table(tableName: String): DataFrame =
DataFrame(this, catalog.lookupRelation(Seq(tableName)))
@@ -819,6 +929,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Returns a [[DataFrame]] containing names of existing tables in the current database.
* The returned DataFrame has two columns, tableName and isTemporary (a Boolean
* indicating if a table is a temporary one or not).
+ *
+ * @group ddl_ops
*/
def tables(): DataFrame = {
DataFrame(this, ShowTablesCommand(None))
@@ -828,6 +940,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Returns a [[DataFrame]] containing names of existing tables in the given database.
* The returned DataFrame has two columns, tableName and isTemporary (a Boolean
* indicating if a table is a temporary one or not).
+ *
+ * @group ddl_ops
*/
def tables(databaseName: String): DataFrame = {
DataFrame(this, ShowTablesCommand(Some(databaseName)))
@@ -835,6 +949,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Returns the names of tables in the current database as an array.
+ *
+ * @group ddl_ops
*/
def tableNames(): Array[String] = {
catalog.getTables(None).map {
@@ -844,6 +960,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Returns the names of tables in the given database as an array.
+ *
+ * @group ddl_ops
*/
def tableNames(databaseName: String): Array[String] = {
catalog.getTables(Some(databaseName)).map {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
index ee94a5fdbe..295db539ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types.DataType
* df.select( predict(df("score")) )
* }}}
*/
-case class UserDefinedFunction(f: AnyRef, dataType: DataType) {
+case class UserDefinedFunction protected[sql] (f: AnyRef, dataType: DataType) {
def apply(exprs: Column*): Column = {
Column(ScalaUdf(f, dataType, exprs.map(_.expr)))
@@ -58,6 +58,7 @@ private[sql] case class UserDefinedPythonFunction(
accumulator: Accumulator[JList[Array[Byte]]],
dataType: DataType) {
+ /** Returns a [[Column]] that will evaluate to calling this UDF with the given input. */
def apply(exprs: Column*): Column = {
val udf = PythonUDF(name, command, envVars, pythonIncludes, pythonExec, broadcastVars,
accumulator, dataType, exprs.map(_.expr))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala
new file mode 100644
index 0000000000..cbbd005228
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/package.scala
@@ -0,0 +1,23 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+/**
+ * Contains API classes that are specific to a single language (i.e. Java).
+ */
+package object api
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index c6cd6eb6a2..7c92e9fc88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -144,7 +144,7 @@ case class CacheTableCommand(
override def run(sqlContext: SQLContext) = {
plan.foreach { logicalPlan =>
- sqlContext.registerRDDAsTable(DataFrame(sqlContext, logicalPlan), tableName)
+ sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName)
}
sqlContext.cacheTable(tableName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index acef49aabf..73162b22fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -32,7 +32,9 @@ import org.apache.spark.sql.types._
*
* Usage:
* {{{
- * sql("SELECT key FROM src").debug
+ * import org.apache.spark.sql.execution.debug._
+ * sql("SELECT key FROM src").debug()
+ * dataFrame.typeCheck()
* }}}
*/
package object debug {
@@ -144,11 +146,9 @@ package object debug {
}
/**
- * :: DeveloperApi ::
* Helper functions for checking that runtime types match a given schema.
*/
- @DeveloperApi
- object TypeCheck {
+ private[sql] object TypeCheck {
def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
case (null, _) =>
@@ -174,10 +174,8 @@ package object debug {
}
/**
- * :: DeveloperApi ::
* Augments [[DataFrame]]s with debug methods.
*/
- @DeveloperApi
private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan {
import TypeCheck._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
index 34a83f0a5d..34f864f5fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.jdbc.{JDBCPartitioningInfo, JDBCRelation, JDBCPartit
import org.apache.spark.sql.types._
package object jdbc {
- object JDBCWriteDetails extends Logging {
+ private[sql] object JDBCWriteDetails extends Logging {
/**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
- private def insertStatement(conn: Connection, table: String, rddSchema: StructType):
+ def insertStatement(conn: Connection, table: String, rddSchema: StructType):
PreparedStatement = {
val sql = new StringBuilder(s"INSERT INTO $table VALUES (")
var fieldsLeft = rddSchema.fields.length
@@ -56,7 +56,7 @@ package object jdbc {
* non-Serializable. Instead, we explicitly close over all variables that
* are used.
*/
- private[jdbc] def savePartition(url: String, table: String, iterator: Iterator[Row],
+ def savePartition(url: String, table: String, iterator: Iterator[Row],
rddSchema: StructType, nullTypes: Array[Int]): Iterator[Byte] = {
val conn = DriverManager.getConnection(url)
var committed = false
@@ -117,19 +117,14 @@ package object jdbc {
}
Array[Byte]().iterator
}
- }
- /**
- * Make it so that you can call createJDBCTable and insertIntoJDBC on a DataFrame.
- */
- implicit class JDBCDataFrame(rdd: DataFrame) {
/**
* Compute the schema string for this RDD.
*/
- private def schemaString(url: String): String = {
+ def schemaString(df: DataFrame, url: String): String = {
val sb = new StringBuilder()
val quirks = DriverQuirks.get(url)
- rdd.schema.fields foreach { field => {
+ df.schema.fields foreach { field => {
val name = field.name
var typ: String = quirks.getJDBCType(field.dataType)._1
if (typ == null) typ = field.dataType match {
@@ -156,9 +151,9 @@ package object jdbc {
/**
* Saves the RDD to the database in a single transaction.
*/
- private def saveTable(url: String, table: String) {
+ def saveTable(df: DataFrame, url: String, table: String) {
val quirks = DriverQuirks.get(url)
- var nullTypes: Array[Int] = rdd.schema.fields.map(field => {
+ var nullTypes: Array[Int] = df.schema.fields.map(field => {
var nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
if (nullType.isEmpty) {
field.dataType match {
@@ -175,61 +170,16 @@ package object jdbc {
case DateType => java.sql.Types.DATE
case DecimalType.Unlimited => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
- s"Can't translate null value for field $field")
+ s"Can't translate null value for field $field")
}
} else nullType.get
}).toArray
- val rddSchema = rdd.schema
- rdd.mapPartitions(iterator => JDBCWriteDetails.savePartition(
- url, table, iterator, rddSchema, nullTypes)).collect()
- }
-
- /**
- * Save this RDD to a JDBC database at `url` under the table name `table`.
- * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
- * If you pass `true` for `allowExisting`, it will drop any table with the
- * given name; if you pass `false`, it will throw if the table already
- * exists.
- */
- def createJDBCTable(url: String, table: String, allowExisting: Boolean) {
- val conn = DriverManager.getConnection(url)
- try {
- if (allowExisting) {
- val sql = s"DROP TABLE IF EXISTS $table"
- conn.prepareStatement(sql).executeUpdate()
- }
- val schema = schemaString(url)
- val sql = s"CREATE TABLE $table ($schema)"
- conn.prepareStatement(sql).executeUpdate()
- } finally {
- conn.close()
+ val rddSchema = df.schema
+ df.foreachPartition { iterator =>
+ JDBCWriteDetails.savePartition(url, table, iterator, rddSchema, nullTypes)
}
- saveTable(url, table)
}
- /**
- * Save this RDD to a JDBC database at `url` under the table name `table`.
- * Assumes the table already exists and has a compatible schema. If you
- * pass `true` for `overwrite`, it will `TRUNCATE` the table before
- * performing the `INSERT`s.
- *
- * The table must already exist on the database. It must have a schema
- * that is compatible with the schema of this RDD; inserting the rows of
- * the RDD in order via the simple statement
- * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
- */
- def insertIntoJDBC(url: String, table: String, overwrite: Boolean) {
- if (overwrite) {
- val conn = DriverManager.getConnection(url)
- try {
- val sql = s"TRUNCATE TABLE $table"
- conn.prepareStatement(sql).executeUpdate()
- } finally {
- conn.close()
- }
- }
- saveTable(url, table)
- }
- } // implicit class JDBCDataFrame
+ }
} // package object jdbc
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 7dd8bea49b..65966458eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -55,7 +55,7 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext}
* Parquet table scan operator. Imports the file that backs the given
* [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
*/
-case class ParquetTableScan(
+private[sql] case class ParquetTableScan(
attributes: Seq[Attribute],
relation: ParquetRelation,
columnPruningPred: Seq[Expression])
@@ -210,7 +210,7 @@ case class ParquetTableScan(
* (only detected via filename pattern so will not catch all cases).
*/
@DeveloperApi
-case class InsertIntoParquetTable(
+private[sql] case class InsertIntoParquetTable(
relation: ParquetRelation,
child: SparkPlan,
overwrite: Boolean = false)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index d0856df8d4..052728c5d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
* convenient to use tuples rather than special case classes when writing test cases/suites.
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
*/
-trait ParquetTest {
+private[sql] trait ParquetTest {
val sqlContext: SQLContext
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
@@ -121,7 +121,7 @@ trait ParquetTest {
(data: Seq[T], tableName: String)
(f: => Unit): Unit = {
withParquetRDD(data) { rdd =>
- sqlContext.registerRDDAsTable(rdd, tableName)
+ sqlContext.registerDataFrameAsTable(rdd, tableName)
withTempTable(tableName)(f)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 9bb34e2df9..95bea92011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -72,7 +72,7 @@ import org.apache.spark.{Logging, Partition => SparkPartition, SerializableWrita
* null or empty string. This is similar to the `hive.exec.default.partition.name` configuration
* in Hive.
*/
-class DefaultSource
+private[sql] class DefaultSource
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider {
@@ -147,7 +147,7 @@ private[sql] case class PartitionSpec(partitionColumns: StructType, partitions:
* discovery.
*/
@DeveloperApi
-case class ParquetRelation2(
+private[sql] case class ParquetRelation2(
paths: Seq[String],
parameters: Map[String, String],
maybeSchema: Option[StructType] = None,
@@ -600,7 +600,7 @@ case class ParquetRelation2(
}
}
-object ParquetRelation2 {
+private[sql] object ParquetRelation2 {
// Whether we should merge schemas collected from all Parquet part-files.
val MERGE_SCHEMA = "mergeSchema"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
index 8871616844..e24475292c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
@@ -53,7 +53,7 @@ private[parquet] class NanoTime extends Serializable {
"NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}"
}
-object NanoTime {
+private[sql] object NanoTime {
def fromBinary(bytes: Binary): NanoTime = {
Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes")
val buf = bytes.toByteBuffer
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index dd8b3d211b..5020689f7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -374,7 +374,7 @@ private[sql] case class CreateTempTableUsing(
def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
- sqlContext.registerRDDAsTable(
+ sqlContext.registerDataFrameAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
Seq.empty
}
@@ -390,7 +390,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
def run(sqlContext: SQLContext) = {
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
- sqlContext.registerRDDAsTable(
+ sqlContext.registerDataFrameAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
Seq.empty
diff --git a/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java b/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java
deleted file mode 100644
index 80bd74f5b5..0000000000
--- a/sql/core/src/test/java/org/apache/spark/sql/jdbc/JavaJDBCTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.jdbc;
-
-import org.junit.*;
-import static org.junit.Assert.*;
-import java.sql.Connection;
-import java.sql.DriverManager;
-
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.api.java.*;
-import org.apache.spark.sql.test.TestSQLContext$;
-
-public class JavaJDBCTest {
- static String url = "jdbc:h2:mem:testdb1";
-
- static Connection conn = null;
-
- // This variable will always be null if TestSQLContext is intact when running
- // these tests. Some Java tests do not play nicely with others, however;
- // they create a SparkContext of their own at startup and stop it at exit.
- // This renders TestSQLContext inoperable, meaning we have to do the same
- // thing. If this variable is nonnull, that means we allocated a
- // SparkContext of our own and that we need to stop it at teardown.
- static JavaSparkContext localSparkContext = null;
-
- static SQLContext sql = TestSQLContext$.MODULE$;
-
- @Before
- public void beforeTest() throws Exception {
- if (SparkEnv.get() == null) { // A previous test destroyed TestSQLContext.
- localSparkContext = new JavaSparkContext("local", "JavaAPISuite");
- sql = new SQLContext(localSparkContext);
- }
- Class.forName("org.h2.Driver");
- conn = DriverManager.getConnection(url);
- conn.prepareStatement("create schema test").executeUpdate();
- conn.prepareStatement("create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate();
- conn.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate();
- conn.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate();
- conn.prepareStatement("insert into test.people values ('joe', 3)").executeUpdate();
- conn.commit();
- }
-
- @After
- public void afterTest() throws Exception {
- if (localSparkContext != null) {
- localSparkContext.stop();
- localSparkContext = null;
- }
- try {
- conn.close();
- } finally {
- conn = null;
- }
- }
-
- @Test
- public void basicTest() throws Exception {
- DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE");
- Row[] rows = rdd.collect();
- assertEquals(rows.length, 3);
- }
-
- @Test
- public void partitioningTest() throws Exception {
- String[] parts = new String[2];
- parts[0] = "THEID < 2";
- parts[1] = "THEID = 2"; // Deliberately forget about one of them.
- DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE", parts);
- Row[] rows = rdd.collect();
- assertEquals(rows.length, 2);
- }
-
- @Test
- public void writeTest() throws Exception {
- DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE");
- JDBCUtils.createJDBCTable(rdd, url, "TEST.PEOPLECOPY", false);
- DataFrame rdd2 = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLECOPY");
- Row[] rows = rdd2.collect();
- assertEquals(rows.length, 3);
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d25c1390db..07db672217 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -164,17 +164,16 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
}
test("Basic API") {
- assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE").collect.size == 3)
+ assert(TestSQLContext.jdbc(url, "TEST.PEOPLE").collect.size == 3)
}
test("Partitioning via JDBCPartitioningInfo API") {
- val parts = JDBCPartitioningInfo("THEID", 0, 4, 3)
- assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE", parts).collect.size == 3)
+ assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", "THEID", 0, 4, 3).collect.size == 3)
}
test("Partitioning via list-of-where-clauses API") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
- assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE", parts).collect.size == 3)
+ assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", parts).collect.size == 3)
}
test("H2 integral types") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 21e7093610..ad2fbc3f04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -57,8 +57,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
srdd.createJDBCTable(url, "TEST.BASICCREATETEST", false)
- assert(2 == TestSQLContext.jdbcRDD(url, "TEST.BASICCREATETEST").count)
- assert(2 == TestSQLContext.jdbcRDD(url, "TEST.BASICCREATETEST").collect()(0).length)
+ assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count)
+ assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length)
}
test("CREATE with overwrite") {
@@ -66,12 +66,12 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
srdd.createJDBCTable(url, "TEST.DROPTEST", false)
- assert(2 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").count)
- assert(3 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").collect()(0).length)
+ assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count)
+ assert(3 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length)
srdd2.createJDBCTable(url, "TEST.DROPTEST", true)
- assert(1 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").count)
- assert(2 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").collect()(0).length)
+ assert(1 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count)
+ assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length)
}
test("CREATE then INSERT to append") {
@@ -80,8 +80,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
srdd.createJDBCTable(url, "TEST.APPENDTEST", false)
srdd2.insertIntoJDBC(url, "TEST.APPENDTEST", false)
- assert(3 == TestSQLContext.jdbcRDD(url, "TEST.APPENDTEST").count)
- assert(2 == TestSQLContext.jdbcRDD(url, "TEST.APPENDTEST").collect()(0).length)
+ assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count)
+ assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length)
}
test("CREATE then INSERT to truncate") {
@@ -90,8 +90,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
srdd.createJDBCTable(url, "TEST.TRUNCATETEST", false)
srdd2.insertIntoJDBC(url, "TEST.TRUNCATETEST", true)
- assert(1 == TestSQLContext.jdbcRDD(url, "TEST.TRUNCATETEST").count)
- assert(2 == TestSQLContext.jdbcRDD(url, "TEST.TRUNCATETEST").collect()(0).length)
+ assert(1 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").count)
+ assert(2 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").collect()(0).length)
}
test("Incompatible INSERT to append") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
index 89920f2650..4f38110c80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
@@ -143,7 +143,7 @@ class MySQLDatabase {
}
test("Basic test") {
- val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "tbl")
+ val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl")
val rows = rdd.collect
assert(rows.length == 2)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -153,7 +153,7 @@ class MySQLDatabase {
}
test("Numeric types") {
- val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers")
+ val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
val rows = rdd.collect
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -181,7 +181,7 @@ class MySQLDatabase {
}
test("Date types") {
- val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates")
+ val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates")
val rows = rdd.collect
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -199,7 +199,7 @@ class MySQLDatabase {
}
test("String types") {
- val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings")
+ val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings")
val rows = rdd.collect
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -225,9 +225,9 @@ class MySQLDatabase {
}
test("Basic write test") {
- val rdd1 = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers")
- val rdd2 = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates")
- val rdd3 = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings")
+ val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
+ val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates")
+ val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings")
rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false)
rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false)
rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
index c174d7adb7..7b47feeb78 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
@@ -113,7 +113,7 @@ class PostgresDatabase {
}
test("Type mapping for various types") {
- val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar")
+ val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
val rows = rdd.collect
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -142,7 +142,7 @@ class PostgresDatabase {
}
test("Basic write test") {
- val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar")
+ val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
rdd.createJDBCTable(url(db.ip), "public.barcopy", false)
// Test only that it doesn't bomb out.
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index bfacc51ef5..07b5a84fb6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.SQLContext
/**
- * Implementation for "describe [extended] table".
- *
* :: DeveloperApi ::
+ *
+ * Implementation for "describe [extended] table".
*/
@DeveloperApi
case class DescribeHiveTableCommand(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 0aa5f7f7b8..6afd8eea05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -102,6 +102,10 @@ case class AddFile(path: String) extends RunnableCommand {
}
}
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
case class CreateMetastoreDataSource(
tableName: String,
userSpecifiedSchema: Option[StructType],
@@ -141,6 +145,10 @@ case class CreateMetastoreDataSource(
}
}
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
case class CreateMetastoreDataSourceAsSelect(
tableName: String,
provider: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala
index 86bb67ec74..4989c42e96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JavaJDBCTrampoline.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.spark.sql.jdbc
+package org.apache.spark.sql.hive
-import org.apache.spark.sql.DataFrame
-
-private[jdbc] class JavaJDBCTrampoline {
- def createJDBCTable(rdd: DataFrame, url: String, table: String, allowExisting: Boolean) {
- rdd.createJDBCTable(url, table, allowExisting);
- }
-
- def insertIntoJDBC(rdd: DataFrame, url: String, table: String, overwrite: Boolean) {
- rdd.insertIntoJDBC(url, table, overwrite);
- }
-}
+/**
+ * Physical execution operators used for running queries against data stored in Hive. These
+ * are not intended for use by users, but are documents so that it is easier to understand
+ * the output of EXPLAIN queries.
+ */
+package object execution
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala
index a6c8ed4f7e..db074361ef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/package.scala
@@ -17,4 +17,14 @@
package org.apache.spark.sql
+/**
+ * Support for running Spark SQL queries using functionality from Apache Hive (does not require an
+ * existing Hive installation). Supported Hive features include:
+ * - Using HiveQL to express queries.
+ * - Reading metadata from the Hive Metastore using HiveSerDes.
+ * - Hive UDFs, UDAs, UDTs
+ *
+ * Users that would like access to this functionality should create a
+ * [[hive.HiveContext HiveContext]] instead of a [[SQLContext]].
+ */
package object hive
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
deleted file mode 100644
index 2a16c9d1a2..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.parquet
-
-import java.util.Properties
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
-import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe}
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
-import org.apache.hadoop.io.Writable
-
-/**
- * A placeholder that allows Spark SQL users to create metastore tables that are stored as
- * parquet files. It is only intended to pass the checks that the serde is valid and exists
- * when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan
- * when "spark.sql.hive.convertMetastoreParquet" is set to true.
- */
-@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
- "placeholder in the Hive MetaStore", "1.2.0")
-class FakeParquetSerDe extends SerDe {
- override def getObjectInspector: ObjectInspector = new ObjectInspector {
- override def getCategory: Category = Category.PRIMITIVE
-
- override def getTypeName: String = "string"
- }
-
- override def deserialize(p1: Writable): AnyRef = throwError
-
- override def initialize(p1: Configuration, p2: Properties): Unit = {}
-
- override def getSerializedClass: Class[_ <: Writable] = throwError
-
- override def getSerDeStats: SerDeStats = throwError
-
- override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError
-
- private def throwError =
- sys.error(
- "spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe")
-}
diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
index 8534c7d706..30646ddbc2 100644
--- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
+++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
@@ -43,7 +43,9 @@ import org.apache.hadoop.mapred.InputFormat
import org.apache.spark.sql.types.{Decimal, DecimalType}
-case class HiveFunctionWrapper(functionClassName: String) extends java.io.Serializable {
+private[hive] case class HiveFunctionWrapper(functionClassName: String)
+ extends java.io.Serializable {
+
// for Serialization
def this() = this(null)
@@ -249,6 +251,9 @@ private[hive] object HiveShim {
def setTblNullFormat(crtTbl: CreateTableDesc, tbl: Table) = {}
}
-class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
+private[hive] class ShimFileSinkDesc(
+ var dir: String,
+ var tableInfo: TableDesc,
+ var compressed: Boolean)
extends FileSinkDesc(dir, tableInfo, compressed) {
}
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index 72104f5b55..f9fcbdae15 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -56,7 +56,9 @@ import org.apache.spark.sql.types.{Decimal, DecimalType}
*
* @param functionClassName UDF class name
*/
-case class HiveFunctionWrapper(var functionClassName: String) extends java.io.Externalizable {
+private[hive] case class HiveFunctionWrapper(var functionClassName: String)
+ extends java.io.Externalizable {
+
// for Serialization
def this() = this(null)
@@ -423,7 +425,10 @@ private[hive] object HiveShim {
* Bug introduced in hive-0.13. FileSinkDesc is serilizable, but its member path is not.
* Fix it through wrapper.
*/
-class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
+private[hive] class ShimFileSinkDesc(
+ var dir: String,
+ var tableInfo: TableDesc,
+ var compressed: Boolean)
extends Serializable with Logging {
var compressCodec: String = _
var compressType: String = _