aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-10 13:49:23 -0700
committerReynold Xin <rxin@databricks.com>2015-08-10 13:49:23 -0700
commit40ed2af587cedadc6e5249031857a922b3b234ca (patch)
tree195083684234805d6886b6dbdc8711d26d260858 /sql
parent0fe66744f16854fc8cd8a72174de93a788e3cf6c (diff)
downloadspark-40ed2af587cedadc6e5249031857a922b3b234ca.tar.gz
spark-40ed2af587cedadc6e5249031857a922b3b234ca.tar.bz2
spark-40ed2af587cedadc6e5249031857a922b3b234ca.zip
[SPARK-9763][SQL] Minimize exposure of internal SQL classes.
There are a few changes in this pull request: 1. Moved all data sources to execution.datasources, except the public JDBC APIs. 2. In order to maintain backward compatibility from 1, added a backward compatibility translation map in data source resolution. 3. Moved ui and metric package into execution. 4. Added more documentation on some internal classes. 5. Renamed DataSourceRegister.format -> shortName. 6. Added "override" modifier on shortName. 7. Removed IntSQLMetric. Author: Reynold Xin <rxin@databricks.com> Closes #8056 from rxin/SPARK-9763 and squashes the following commits: 9df4801 [Reynold Xin] Removed hardcoded name in test cases. d9babc6 [Reynold Xin] Shorten. e484419 [Reynold Xin] Removed VisibleForTesting. 171b812 [Reynold Xin] MimaExcludes. 2041389 [Reynold Xin] Compile ... 79dda42 [Reynold Xin] Compile. 0818ba3 [Reynold Xin] Removed IntSQLMetric. c46884f [Reynold Xin] Two more fixes. f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister6
-rw-r--r--sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css (renamed from sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css)0
-rw-r--r--sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js (renamed from sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js)0
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala185
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DefaultSource.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala204
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala352
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala62
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala60
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverWrapper.scala48
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala)9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala)41
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala219
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala)7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/json/JacksonUtils.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/metric/SQLMetrics.scala)36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala)7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala)6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala52
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala250
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala15
-rw-r--r--sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java (renamed from sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/CompatibilityTest.java)4
-rw-r--r--sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Nested.java (renamed from sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/Nested.java)30
-rw-r--r--sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetAvroCompat.java (renamed from sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java)106
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala)4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala)4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala)4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala)2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/metric/SQLMetricsSuite.scala)12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/ui/SQLListenerSuite.scala)4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala59
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala39
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala2
75 files changed, 1093 insertions, 963 deletions
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index cc32d4b727..ca50000b47 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,3 +1,3 @@
-org.apache.spark.sql.jdbc.DefaultSource
-org.apache.spark.sql.json.DefaultSource
-org.apache.spark.sql.parquet.DefaultSource
+org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
+org.apache.spark.sql.execution.datasources.json.DefaultSource
+org.apache.spark.sql.execution.datasources.parquet.DefaultSource
diff --git a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
index ddd3a91dd8..ddd3a91dd8 100644
--- a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css
+++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
diff --git a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
index 5161fcde66..5161fcde66 100644
--- a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js
+++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 570b8b2d59..27b994f1f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
-import org.apache.spark.sql.json.JacksonGenerator
+import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 85f33c5e99..9ea955b010 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -25,10 +25,10 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
+import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
-import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.json.JSONRelation
-import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2a4992db09..5fa11da4c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,8 +23,8 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
-import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
import org.apache.spark.sql.sources.HadoopFsRelation
@@ -264,7 +264,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
// Create the table if the table didn't exist.
if (!tableExists) {
- val schema = JDBCWriteDetails.schemaString(df, url)
+ val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
conn.prepareStatement(sql).executeUpdate()
}
@@ -272,7 +272,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
conn.close()
}
- JDBCWriteDetails.saveTable(df, url, table, connectionProperties)
+ JdbcUtils.saveTable(df, url, table, connectionProperties)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 832572571c..f73bb0488c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
-import org.apache.spark.sql.ui.{SQLListener, SQLTab}
+import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 97f1323e97..cee58218a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.ui.SparkPlanGraph
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.util.Utils
private[sql] object SQLExecution {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 1915496d16..9ba5cf2d2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.metric.{IntSQLMetric, LongSQLMetric, SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.DataType
object SparkPlan {
@@ -99,12 +99,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics
/**
- * Return a IntSQLMetric according to the name.
- */
- private[sql] def intMetric(name: String): IntSQLMetric =
- metrics(name).asInstanceOf[IntSQLMetric]
-
- /**
* Return a LongSQLMetric according to the name.
*/
private[sql] def longMetric(name: String): LongSQLMetric =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 24950f2606..bf2de244c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
new file mode 100644
index 0000000000..6c462fa304
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
@@ -0,0 +1,185 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.language.implicitConversions
+import scala.util.matching.Regex
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.catalyst.{TableIdentifier, AbstractSparkSQLParser}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types._
+
+
+/**
+ * A parser for foreign DDL commands.
+ */
+class DDLParser(parseQuery: String => LogicalPlan)
+ extends AbstractSparkSQLParser with DataTypeParser with Logging {
+
+ def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
+ try {
+ parse(input)
+ } catch {
+ case ddlException: DDLException => throw ddlException
+ case _ if !exceptionOnError => parseQuery(input)
+ case x: Throwable => throw x
+ }
+ }
+
+ // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
+ // properties via reflection the class in runtime for constructing the SqlLexical object
+ protected val CREATE = Keyword("CREATE")
+ protected val TEMPORARY = Keyword("TEMPORARY")
+ protected val TABLE = Keyword("TABLE")
+ protected val IF = Keyword("IF")
+ protected val NOT = Keyword("NOT")
+ protected val EXISTS = Keyword("EXISTS")
+ protected val USING = Keyword("USING")
+ protected val OPTIONS = Keyword("OPTIONS")
+ protected val DESCRIBE = Keyword("DESCRIBE")
+ protected val EXTENDED = Keyword("EXTENDED")
+ protected val AS = Keyword("AS")
+ protected val COMMENT = Keyword("COMMENT")
+ protected val REFRESH = Keyword("REFRESH")
+
+ protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
+
+ protected def start: Parser[LogicalPlan] = ddl
+
+ /**
+ * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
+ * USING org.apache.spark.sql.avro
+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+ * or
+ * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
+ * USING org.apache.spark.sql.avro
+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+ * or
+ * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
+ * USING org.apache.spark.sql.avro
+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+ * AS SELECT ...
+ */
+ protected lazy val createTable: Parser[LogicalPlan] = {
+ // TODO: Support database.table.
+ (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
+ tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
+ case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
+ if (temp.isDefined && allowExisting.isDefined) {
+ throw new DDLException(
+ "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
+ }
+
+ val options = opts.getOrElse(Map.empty[String, String])
+ if (query.isDefined) {
+ if (columns.isDefined) {
+ throw new DDLException(
+ "a CREATE TABLE AS SELECT statement does not allow column definitions.")
+ }
+ // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
+ val mode = if (allowExisting.isDefined) {
+ SaveMode.Ignore
+ } else if (temp.isDefined) {
+ SaveMode.Overwrite
+ } else {
+ SaveMode.ErrorIfExists
+ }
+
+ val queryPlan = parseQuery(query.get)
+ CreateTableUsingAsSelect(tableName,
+ provider,
+ temp.isDefined,
+ Array.empty[String],
+ mode,
+ options,
+ queryPlan)
+ } else {
+ val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema,
+ provider,
+ temp.isDefined,
+ options,
+ allowExisting.isDefined,
+ managedIfNoPath = false)
+ }
+ }
+ }
+
+ protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
+
+ /*
+ * describe [extended] table avroTable
+ * This will display all columns of table `avroTable` includes column_name,column_type,comment
+ */
+ protected lazy val describeTable: Parser[LogicalPlan] =
+ (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
+ case e ~ db ~ tbl =>
+ val tblIdentifier = db match {
+ case Some(dbName) =>
+ Seq(dbName, tbl)
+ case None =>
+ Seq(tbl)
+ }
+ DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
+ }
+
+ protected lazy val refreshTable: Parser[LogicalPlan] =
+ REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
+ case maybeDatabaseName ~ tableName =>
+ RefreshTable(TableIdentifier(tableName, maybeDatabaseName))
+ }
+
+ protected lazy val options: Parser[Map[String, String]] =
+ "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
+
+ protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
+
+ override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
+ s"identifier matching regex $regex", {
+ case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
+ case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
+ }
+ )
+
+ protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
+ case name => name
+ }
+
+ protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
+ case parts => parts.mkString(".")
+ }
+
+ protected lazy val pair: Parser[(String, String)] =
+ optionName ~ stringLit ^^ { case k ~ v => (k, v) }
+
+ protected lazy val column: Parser[StructField] =
+ ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
+ val meta = cm match {
+ case Some(comment) =>
+ new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
+ case None => Metadata.empty
+ }
+
+ StructField(columnName, typ, nullable = true, meta)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DefaultSource.scala
new file mode 100644
index 0000000000..6e4cc4de7f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DefaultSource.scala
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.Properties
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCRelation, JDBCPartitioningInfo, DriverRegistry}
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
+
+
+class DefaultSource extends RelationProvider with DataSourceRegister {
+
+ override def shortName(): String = "jdbc"
+
+ /** Returns a new base relation with the given parameters. */
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String]): BaseRelation = {
+ val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
+ val driver = parameters.getOrElse("driver", null)
+ val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
+ val partitionColumn = parameters.getOrElse("partitionColumn", null)
+ val lowerBound = parameters.getOrElse("lowerBound", null)
+ val upperBound = parameters.getOrElse("upperBound", null)
+ val numPartitions = parameters.getOrElse("numPartitions", null)
+
+ if (driver != null) DriverRegistry.register(driver)
+
+ if (partitionColumn != null
+ && (lowerBound == null || upperBound == null || numPartitions == null)) {
+ sys.error("Partitioning incompletely specified")
+ }
+
+ val partitionInfo = if (partitionColumn == null) {
+ null
+ } else {
+ JDBCPartitioningInfo(
+ partitionColumn,
+ lowerBound.toLong,
+ upperBound.toLong,
+ numPartitions.toInt)
+ }
+ val parts = JDBCRelation.columnPartition(partitionInfo)
+ val properties = new Properties() // Additional properties that we will pass to getConnection
+ parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
+ JDBCRelation(url, table, parts, properties)(sqlContext)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
index 6ccde7693b..3b7dc2e8d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
@@ -17,27 +17,10 @@
package org.apache.spark.sql.execution.datasources
-import java.io.IOException
-import java.util.{Date, UUID}
-
-import scala.collection.JavaConversions.asScalaIterator
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
-import org.apache.spark._
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.{Utils, SerializableConfiguration}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.sources.InsertableRelation
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
new file mode 100644
index 0000000000..7770bbd712
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -0,0 +1,204 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConversions._
+import scala.language.{existentials, implicitConversions}
+import scala.util.{Success, Failure, Try}
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{DataFrame, SaveMode, AnalysisException, SQLContext}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
+import org.apache.spark.util.Utils
+
+
+case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+
+
+object ResolvedDataSource extends Logging {
+
+ /** A map to maintain backward compatibility in case we move data sources around. */
+ private val backwardCompatibilityMap = Map(
+ "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
+ "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName
+ )
+
+ /** Given a provider name, look up the data source class definition. */
+ def lookupDataSource(provider0: String): Class[_] = {
+ val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
+ val provider2 = s"$provider.DefaultSource"
+ val loader = Utils.getContextOrSparkClassLoader
+ val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+ serviceLoader.iterator().filter(_.shortName().equalsIgnoreCase(provider)).toList match {
+ /** the provider format did not match any given registered aliases */
+ case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
+ case Success(dataSource) => dataSource
+ case Failure(error) =>
+ if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+ throw new ClassNotFoundException(
+ "The ORC data source must be used with Hive support enabled.", error)
+ } else {
+ throw new ClassNotFoundException(
+ s"Failed to load class for data source: $provider.", error)
+ }
+ }
+ /** there is exactly one registered alias */
+ case head :: Nil => head.getClass
+ /** There are multiple registered aliases for the input */
+ case sources => sys.error(s"Multiple sources found for $provider, " +
+ s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+ "please specify the fully qualified class name.")
+ }
+ }
+
+ /** Create a [[ResolvedDataSource]] for reading data in. */
+ def apply(
+ sqlContext: SQLContext,
+ userSpecifiedSchema: Option[StructType],
+ partitionColumns: Array[String],
+ provider: String,
+ options: Map[String, String]): ResolvedDataSource = {
+ val clazz: Class[_] = lookupDataSource(provider)
+ def className: String = clazz.getCanonicalName
+ val relation = userSpecifiedSchema match {
+ case Some(schema: StructType) => clazz.newInstance() match {
+ case dataSource: SchemaRelationProvider =>
+ dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
+ case dataSource: HadoopFsRelationProvider =>
+ val maybePartitionsSchema = if (partitionColumns.isEmpty) {
+ None
+ } else {
+ Some(partitionColumnsSchema(schema, partitionColumns))
+ }
+
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val paths = {
+ val patternPath = new Path(caseInsensitiveOptions("path"))
+ val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+ }
+
+ val dataSchema =
+ StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
+
+ dataSource.createRelation(
+ sqlContext,
+ paths,
+ Some(dataSchema),
+ maybePartitionsSchema,
+ caseInsensitiveOptions)
+ case dataSource: org.apache.spark.sql.sources.RelationProvider =>
+ throw new AnalysisException(s"$className does not allow user-specified schemas.")
+ case _ =>
+ throw new AnalysisException(s"$className is not a RelationProvider.")
+ }
+
+ case None => clazz.newInstance() match {
+ case dataSource: RelationProvider =>
+ dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
+ case dataSource: HadoopFsRelationProvider =>
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val paths = {
+ val patternPath = new Path(caseInsensitiveOptions("path"))
+ val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+ }
+ dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
+ case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
+ throw new AnalysisException(
+ s"A schema needs to be specified when using $className.")
+ case _ =>
+ throw new AnalysisException(
+ s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
+ }
+ }
+ new ResolvedDataSource(clazz, relation)
+ }
+
+ private def partitionColumnsSchema(
+ schema: StructType,
+ partitionColumns: Array[String]): StructType = {
+ StructType(partitionColumns.map { col =>
+ schema.find(_.name == col).getOrElse {
+ throw new RuntimeException(s"Partition column $col not found in schema $schema")
+ }
+ }).asNullable
+ }
+
+ /** Create a [[ResolvedDataSource]] for saving the content of the given DataFrame. */
+ def apply(
+ sqlContext: SQLContext,
+ provider: String,
+ partitionColumns: Array[String],
+ mode: SaveMode,
+ options: Map[String, String],
+ data: DataFrame): ResolvedDataSource = {
+ if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
+ throw new AnalysisException("Cannot save interval data type into external storage.")
+ }
+ val clazz: Class[_] = lookupDataSource(provider)
+ val relation = clazz.newInstance() match {
+ case dataSource: CreatableRelationProvider =>
+ dataSource.createRelation(sqlContext, mode, options, data)
+ case dataSource: HadoopFsRelationProvider =>
+ // Don't glob path for the write path. The contracts here are:
+ // 1. Only one output path can be specified on the write path;
+ // 2. Output path must be a legal HDFS style file system path;
+ // 3. It's OK that the output path doesn't exist yet;
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val outputPath = {
+ val path = new Path(caseInsensitiveOptions("path"))
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+ val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
+ val r = dataSource.createRelation(
+ sqlContext,
+ Array(outputPath.toString),
+ Some(dataSchema.asNullable),
+ Some(partitionColumnsSchema(data.schema, partitionColumns)),
+ caseInsensitiveOptions)
+
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). This
+ // will be adjusted within InsertIntoHadoopFsRelation.
+ sqlContext.executePlan(
+ InsertIntoHadoopFsRelation(
+ r,
+ data.logicalPlan,
+ mode)).toRdd
+ r
+ case _ =>
+ sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
+ }
+ ResolvedDataSource(clazz, relation)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 8c2f297e42..ecd304c30c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -17,340 +17,12 @@
package org.apache.spark.sql.execution.datasources
-import java.util.ServiceLoader
-
-import scala.collection.Iterator
-import scala.collection.JavaConversions._
-import scala.language.{existentials, implicitConversions}
-import scala.util.{Failure, Success, Try}
-import scala.util.matching.Regex
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, TableIdentifier}
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext, SaveMode}
-import org.apache.spark.util.Utils
-
-/**
- * A parser for foreign DDL commands.
- */
-private[sql] class DDLParser(
- parseQuery: String => LogicalPlan)
- extends AbstractSparkSQLParser with DataTypeParser with Logging {
-
- def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
- try {
- parse(input)
- } catch {
- case ddlException: DDLException => throw ddlException
- case _ if !exceptionOnError => parseQuery(input)
- case x: Throwable => throw x
- }
- }
-
- // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
- // properties via reflection the class in runtime for constructing the SqlLexical object
- protected val CREATE = Keyword("CREATE")
- protected val TEMPORARY = Keyword("TEMPORARY")
- protected val TABLE = Keyword("TABLE")
- protected val IF = Keyword("IF")
- protected val NOT = Keyword("NOT")
- protected val EXISTS = Keyword("EXISTS")
- protected val USING = Keyword("USING")
- protected val OPTIONS = Keyword("OPTIONS")
- protected val DESCRIBE = Keyword("DESCRIBE")
- protected val EXTENDED = Keyword("EXTENDED")
- protected val AS = Keyword("AS")
- protected val COMMENT = Keyword("COMMENT")
- protected val REFRESH = Keyword("REFRESH")
-
- protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
-
- protected def start: Parser[LogicalPlan] = ddl
-
- /**
- * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * or
- * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * or
- * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * AS SELECT ...
- */
- protected lazy val createTable: Parser[LogicalPlan] =
- // TODO: Support database.table.
- (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
- tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
- case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
- if (temp.isDefined && allowExisting.isDefined) {
- throw new DDLException(
- "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
- }
-
- val options = opts.getOrElse(Map.empty[String, String])
- if (query.isDefined) {
- if (columns.isDefined) {
- throw new DDLException(
- "a CREATE TABLE AS SELECT statement does not allow column definitions.")
- }
- // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
- val mode = if (allowExisting.isDefined) {
- SaveMode.Ignore
- } else if (temp.isDefined) {
- SaveMode.Overwrite
- } else {
- SaveMode.ErrorIfExists
- }
-
- val queryPlan = parseQuery(query.get)
- CreateTableUsingAsSelect(tableName,
- provider,
- temp.isDefined,
- Array.empty[String],
- mode,
- options,
- queryPlan)
- } else {
- val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
- CreateTableUsing(
- tableName,
- userSpecifiedSchema,
- provider,
- temp.isDefined,
- options,
- allowExisting.isDefined,
- managedIfNoPath = false)
- }
- }
-
- protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
-
- /*
- * describe [extended] table avroTable
- * This will display all columns of table `avroTable` includes column_name,column_type,comment
- */
- protected lazy val describeTable: Parser[LogicalPlan] =
- (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
- case e ~ db ~ tbl =>
- val tblIdentifier = db match {
- case Some(dbName) =>
- Seq(dbName, tbl)
- case None =>
- Seq(tbl)
- }
- DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
- }
-
- protected lazy val refreshTable: Parser[LogicalPlan] =
- REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
- case maybeDatabaseName ~ tableName =>
- RefreshTable(TableIdentifier(tableName, maybeDatabaseName))
- }
-
- protected lazy val options: Parser[Map[String, String]] =
- "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
-
- protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
-
- override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
- s"identifier matching regex $regex", {
- case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
- case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
- }
- )
-
- protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
- case name => name
- }
-
- protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
- case parts => parts.mkString(".")
- }
-
- protected lazy val pair: Parser[(String, String)] =
- optionName ~ stringLit ^^ { case k ~ v => (k, v) }
-
- protected lazy val column: Parser[StructField] =
- ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
- val meta = cm match {
- case Some(comment) =>
- new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
- case None => Metadata.empty
- }
-
- StructField(columnName, typ, nullable = true, meta)
- }
-}
-
-private[sql] object ResolvedDataSource extends Logging {
-
- /** Given a provider name, look up the data source class definition. */
- def lookupDataSource(provider: String): Class[_] = {
- val provider2 = s"$provider.DefaultSource"
- val loader = Utils.getContextOrSparkClassLoader
- val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
-
- serviceLoader.iterator().filter(_.format().equalsIgnoreCase(provider)).toList match {
- /** the provider format did not match any given registered aliases */
- case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
- case Success(dataSource) => dataSource
- case Failure(error) => if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
- throw new ClassNotFoundException(
- "The ORC data source must be used with Hive support enabled.", error)
- } else {
- throw new ClassNotFoundException(
- s"Failed to load class for data source: $provider", error)
- }
- }
- /** there is exactly one registered alias */
- case head :: Nil => head.getClass
- /** There are multiple registered aliases for the input */
- case sources => sys.error(s"Multiple sources found for $provider, " +
- s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
- "please specify the fully qualified class name")
- }
- }
-
- /** Create a [[ResolvedDataSource]] for reading data in. */
- def apply(
- sqlContext: SQLContext,
- userSpecifiedSchema: Option[StructType],
- partitionColumns: Array[String],
- provider: String,
- options: Map[String, String]): ResolvedDataSource = {
- val clazz: Class[_] = lookupDataSource(provider)
- def className: String = clazz.getCanonicalName
- val relation = userSpecifiedSchema match {
- case Some(schema: StructType) => clazz.newInstance() match {
- case dataSource: SchemaRelationProvider =>
- dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
- case dataSource: HadoopFsRelationProvider =>
- val maybePartitionsSchema = if (partitionColumns.isEmpty) {
- None
- } else {
- Some(partitionColumnsSchema(schema, partitionColumns))
- }
-
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val paths = {
- val patternPath = new Path(caseInsensitiveOptions("path"))
- val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
- }
-
- val dataSchema =
- StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
-
- dataSource.createRelation(
- sqlContext,
- paths,
- Some(dataSchema),
- maybePartitionsSchema,
- caseInsensitiveOptions)
- case dataSource: org.apache.spark.sql.sources.RelationProvider =>
- throw new AnalysisException(s"$className does not allow user-specified schemas.")
- case _ =>
- throw new AnalysisException(s"$className is not a RelationProvider.")
- }
-
- case None => clazz.newInstance() match {
- case dataSource: RelationProvider =>
- dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
- case dataSource: HadoopFsRelationProvider =>
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val paths = {
- val patternPath = new Path(caseInsensitiveOptions("path"))
- val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
- }
- dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
- case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
- throw new AnalysisException(
- s"A schema needs to be specified when using $className.")
- case _ =>
- throw new AnalysisException(
- s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
- }
- }
- new ResolvedDataSource(clazz, relation)
- }
-
- private def partitionColumnsSchema(
- schema: StructType,
- partitionColumns: Array[String]): StructType = {
- StructType(partitionColumns.map { col =>
- schema.find(_.name == col).getOrElse {
- throw new RuntimeException(s"Partition column $col not found in schema $schema")
- }
- }).asNullable
- }
-
- /** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */
- def apply(
- sqlContext: SQLContext,
- provider: String,
- partitionColumns: Array[String],
- mode: SaveMode,
- options: Map[String, String],
- data: DataFrame): ResolvedDataSource = {
- if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
- throw new AnalysisException("Cannot save interval data type into external storage.")
- }
- val clazz: Class[_] = lookupDataSource(provider)
- val relation = clazz.newInstance() match {
- case dataSource: CreatableRelationProvider =>
- dataSource.createRelation(sqlContext, mode, options, data)
- case dataSource: HadoopFsRelationProvider =>
- // Don't glob path for the write path. The contracts here are:
- // 1. Only one output path can be specified on the write path;
- // 2. Output path must be a legal HDFS style file system path;
- // 3. It's OK that the output path doesn't exist yet;
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val outputPath = {
- val path = new Path(caseInsensitiveOptions("path"))
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- }
- val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
- val r = dataSource.createRelation(
- sqlContext,
- Array(outputPath.toString),
- Some(dataSchema.asNullable),
- Some(partitionColumnsSchema(data.schema, partitionColumns)),
- caseInsensitiveOptions)
-
- // For partitioned relation r, r.schema's column ordering can be different from the column
- // ordering of data.logicalPlan (partition columns are all moved after data column). This
- // will be adjusted within InsertIntoHadoopFsRelation.
- sqlContext.executePlan(
- InsertIntoHadoopFsRelation(
- r,
- data.logicalPlan,
- mode)).toRdd
- r
- case _ =>
- sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
- }
- new ResolvedDataSource(clazz, relation)
- }
-}
-
-private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
@@ -358,11 +30,12 @@ private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRel
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
*/
-private[sql] case class DescribeCommand(
+case class DescribeCommand(
table: LogicalPlan,
isExtended: Boolean) extends LogicalPlan with Command {
override def children: Seq[LogicalPlan] = Seq.empty
+
override val output: Seq[Attribute] = Seq(
// Column names are based on Hive.
AttributeReference("col_name", StringType, nullable = false,
@@ -370,7 +43,8 @@ private[sql] case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false,
new MetadataBuilder().putString("comment", "data type of the column").build())(),
AttributeReference("comment", StringType, nullable = false,
- new MetadataBuilder().putString("comment", "comment of the column").build())())
+ new MetadataBuilder().putString("comment", "comment of the column").build())()
+ )
}
/**
@@ -378,7 +52,7 @@ private[sql] case class DescribeCommand(
* @param allowExisting If it is true, we will do nothing when the table already exists.
* If it is false, an exception will be thrown
*/
-private[sql] case class CreateTableUsing(
+case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
@@ -397,7 +71,7 @@ private[sql] case class CreateTableUsing(
* can analyze the logical plan that will be used to populate the table.
* So, [[PreWriteCheck]] can detect cases that are not allowed.
*/
-private[sql] case class CreateTableUsingAsSelect(
+case class CreateTableUsingAsSelect(
tableName: String,
provider: String,
temporary: Boolean,
@@ -410,7 +84,7 @@ private[sql] case class CreateTableUsingAsSelect(
// override lazy val resolved = databaseName != None && childrenResolved
}
-private[sql] case class CreateTempTableUsing(
+case class CreateTempTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
@@ -425,7 +99,7 @@ private[sql] case class CreateTempTableUsing(
}
}
-private[sql] case class CreateTempTableUsingAsSelect(
+case class CreateTempTableUsingAsSelect(
tableName: String,
provider: String,
partitionColumns: Array[String],
@@ -443,7 +117,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
}
}
-private[sql] case class RefreshTable(tableIdent: TableIdentifier)
+case class RefreshTable(tableIdent: TableIdentifier)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
@@ -472,7 +146,7 @@ private[sql] case class RefreshTable(tableIdent: TableIdentifier)
/**
* Builds a map in which keys are case insensitive
*/
-protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
+class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
with Serializable {
val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
@@ -490,4 +164,4 @@ protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[St
/**
* The exception thrown from the DDL parser.
*/
-protected[sql] class DDLException(message: String) extends Exception(message)
+class DDLException(message: String) extends RuntimeException(message)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
new file mode 100644
index 0000000000..6773afc794
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.util.Properties
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, DataSourceRegister}
+
+class DefaultSource extends RelationProvider with DataSourceRegister {
+
+ override def shortName(): String = "jdbc"
+
+ /** Returns a new base relation with the given parameters. */
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String]): BaseRelation = {
+ val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
+ val driver = parameters.getOrElse("driver", null)
+ val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
+ val partitionColumn = parameters.getOrElse("partitionColumn", null)
+ val lowerBound = parameters.getOrElse("lowerBound", null)
+ val upperBound = parameters.getOrElse("upperBound", null)
+ val numPartitions = parameters.getOrElse("numPartitions", null)
+
+ if (driver != null) DriverRegistry.register(driver)
+
+ if (partitionColumn != null
+ && (lowerBound == null || upperBound == null || numPartitions == null)) {
+ sys.error("Partitioning incompletely specified")
+ }
+
+ val partitionInfo = if (partitionColumn == null) {
+ null
+ } else {
+ JDBCPartitioningInfo(
+ partitionColumn,
+ lowerBound.toLong,
+ upperBound.toLong,
+ numPartitions.toInt)
+ }
+ val parts = JDBCRelation.columnPartition(partitionInfo)
+ val properties = new Properties() // Additional properties that we will pass to getConnection
+ parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
+ JDBCRelation(url, table, parts, properties)(sqlContext)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
new file mode 100644
index 0000000000..7ccd61ed46
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.sql.{Driver, DriverManager}
+
+import scala.collection.mutable
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * java.sql.DriverManager is always loaded by bootstrap classloader,
+ * so it can't load JDBC drivers accessible by Spark ClassLoader.
+ *
+ * To solve the problem, drivers from user-supplied jars are wrapped into thin wrapper.
+ */
+object DriverRegistry extends Logging {
+
+ private val wrapperMap: mutable.Map[String, DriverWrapper] = mutable.Map.empty
+
+ def register(className: String): Unit = {
+ val cls = Utils.getContextOrSparkClassLoader.loadClass(className)
+ if (cls.getClassLoader == null) {
+ logTrace(s"$className has been loaded with bootstrap ClassLoader, wrapper is not required")
+ } else if (wrapperMap.get(className).isDefined) {
+ logTrace(s"Wrapper for $className already exists")
+ } else {
+ synchronized {
+ if (wrapperMap.get(className).isEmpty) {
+ val wrapper = new DriverWrapper(cls.newInstance().asInstanceOf[Driver])
+ DriverManager.registerDriver(wrapper)
+ wrapperMap(className) = wrapper
+ logTrace(s"Wrapper for $className registered")
+ }
+ }
+ }
+ }
+
+ def getDriverClassName(url: String): String = DriverManager.getDriver(url) match {
+ case wrapper: DriverWrapper => wrapper.wrapped.getClass.getCanonicalName
+ case driver => driver.getClass.getCanonicalName
+ }
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverWrapper.scala
new file mode 100644
index 0000000000..18263fe227
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverWrapper.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.sql.{Connection, Driver, DriverPropertyInfo, SQLFeatureNotSupportedException}
+import java.util.Properties
+
+/**
+ * A wrapper for a JDBC Driver to work around SPARK-6913.
+ *
+ * The problem is in `java.sql.DriverManager` class that can't access drivers loaded by
+ * Spark ClassLoader.
+ */
+class DriverWrapper(val wrapped: Driver) extends Driver {
+ override def acceptsURL(url: String): Boolean = wrapped.acceptsURL(url)
+
+ override def jdbcCompliant(): Boolean = wrapped.jdbcCompliant()
+
+ override def getPropertyInfo(url: String, info: Properties): Array[DriverPropertyInfo] = {
+ wrapped.getPropertyInfo(url, info)
+ }
+
+ override def getMinorVersion: Int = wrapped.getMinorVersion
+
+ def getParentLogger: java.util.logging.Logger = {
+ throw new SQLFeatureNotSupportedException(
+ s"${this.getClass.getName}.getParentLogger is not yet implemented.")
+ }
+
+ override def connect(url: String, info: Properties): Connection = wrapped.connect(url, info)
+
+ override def getMajorVersion: Int = wrapped.getMajorVersion
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 3cf70db6b7..8eab6a0adc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.jdbc
+package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException}
import java.util.Properties
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -180,9 +181,8 @@ private[sql] object JDBCRDD extends Logging {
try {
if (driver != null) DriverRegistry.register(driver)
} catch {
- case e: ClassNotFoundException => {
- logWarning(s"Couldn't find class $driver", e);
- }
+ case e: ClassNotFoundException =>
+ logWarning(s"Couldn't find class $driver", e)
}
DriverManager.getConnection(url, properties)
}
@@ -344,7 +344,6 @@ private[sql] class JDBCRDD(
}).toArray
}
-
/**
* Runs the SQL query against the JDBC driver.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 48d97ced9c..f9300dc2cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.jdbc
+package org.apache.spark.sql.execution.datasources.jdbc
import java.util.Properties
@@ -77,45 +77,6 @@ private[sql] object JDBCRelation {
}
}
-private[sql] class DefaultSource extends RelationProvider with DataSourceRegister {
-
- def format(): String = "jdbc"
-
- /** Returns a new base relation with the given parameters. */
- override def createRelation(
- sqlContext: SQLContext,
- parameters: Map[String, String]): BaseRelation = {
- val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
- val driver = parameters.getOrElse("driver", null)
- val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
- val partitionColumn = parameters.getOrElse("partitionColumn", null)
- val lowerBound = parameters.getOrElse("lowerBound", null)
- val upperBound = parameters.getOrElse("upperBound", null)
- val numPartitions = parameters.getOrElse("numPartitions", null)
-
- if (driver != null) DriverRegistry.register(driver)
-
- if (partitionColumn != null
- && (lowerBound == null || upperBound == null || numPartitions == null)) {
- sys.error("Partitioning incompletely specified")
- }
-
- val partitionInfo = if (partitionColumn == null) {
- null
- } else {
- JDBCPartitioningInfo(
- partitionColumn,
- lowerBound.toLong,
- upperBound.toLong,
- numPartitions.toInt)
- }
- val parts = JDBCRelation.columnPartition(partitionInfo)
- val properties = new Properties() // Additional properties that we will pass to getConnection
- parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
- JDBCRelation(url, table, parts, properties)(sqlContext)
- }
-}
-
private[sql] case class JDBCRelation(
url: String,
table: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
new file mode 100644
index 0000000000..039c13bf16
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.sql.{Connection, DriverManager, PreparedStatement}
+import java.util.Properties
+
+import scala.util.Try
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.jdbc.JdbcDialects
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row}
+
+/**
+ * Util functions for JDBC tables.
+ */
+object JdbcUtils extends Logging {
+
+ /**
+ * Establishes a JDBC connection.
+ */
+ def createConnection(url: String, connectionProperties: Properties): Connection = {
+ DriverManager.getConnection(url, connectionProperties)
+ }
+
+ /**
+ * Returns true if the table already exists in the JDBC database.
+ */
+ def tableExists(conn: Connection, table: String): Boolean = {
+ // Somewhat hacky, but there isn't a good way to identify whether a table exists for all
+ // SQL database systems, considering "table" could also include the database name.
+ Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess
+ }
+
+ /**
+ * Drops a table from the JDBC database.
+ */
+ def dropTable(conn: Connection, table: String): Unit = {
+ conn.prepareStatement(s"DROP TABLE $table").executeUpdate()
+ }
+
+ /**
+ * Returns a PreparedStatement that inserts a row into table via conn.
+ */
+ def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
+ val sql = new StringBuilder(s"INSERT INTO $table VALUES (")
+ var fieldsLeft = rddSchema.fields.length
+ while (fieldsLeft > 0) {
+ sql.append("?")
+ if (fieldsLeft > 1) sql.append(", ") else sql.append(")")
+ fieldsLeft = fieldsLeft - 1
+ }
+ conn.prepareStatement(sql.toString())
+ }
+
+ /**
+ * Saves a partition of a DataFrame to the JDBC database. This is done in
+ * a single database transaction in order to avoid repeatedly inserting
+ * data as much as possible.
+ *
+ * It is still theoretically possible for rows in a DataFrame to be
+ * inserted into the database more than once if a stage somehow fails after
+ * the commit occurs but before the stage can return successfully.
+ *
+ * This is not a closure inside saveTable() because apparently cosmetic
+ * implementation changes elsewhere might easily render such a closure
+ * non-Serializable. Instead, we explicitly close over all variables that
+ * are used.
+ */
+ def savePartition(
+ getConnection: () => Connection,
+ table: String,
+ iterator: Iterator[Row],
+ rddSchema: StructType,
+ nullTypes: Array[Int]): Iterator[Byte] = {
+ val conn = getConnection()
+ var committed = false
+ try {
+ conn.setAutoCommit(false) // Everything in the same db transaction.
+ val stmt = insertStatement(conn, table, rddSchema)
+ try {
+ while (iterator.hasNext) {
+ val row = iterator.next()
+ val numFields = rddSchema.fields.length
+ var i = 0
+ while (i < numFields) {
+ if (row.isNullAt(i)) {
+ stmt.setNull(i + 1, nullTypes(i))
+ } else {
+ rddSchema.fields(i).dataType match {
+ case IntegerType => stmt.setInt(i + 1, row.getInt(i))
+ case LongType => stmt.setLong(i + 1, row.getLong(i))
+ case DoubleType => stmt.setDouble(i + 1, row.getDouble(i))
+ case FloatType => stmt.setFloat(i + 1, row.getFloat(i))
+ case ShortType => stmt.setInt(i + 1, row.getShort(i))
+ case ByteType => stmt.setInt(i + 1, row.getByte(i))
+ case BooleanType => stmt.setBoolean(i + 1, row.getBoolean(i))
+ case StringType => stmt.setString(i + 1, row.getString(i))
+ case BinaryType => stmt.setBytes(i + 1, row.getAs[Array[Byte]](i))
+ case TimestampType => stmt.setTimestamp(i + 1, row.getAs[java.sql.Timestamp](i))
+ case DateType => stmt.setDate(i + 1, row.getAs[java.sql.Date](i))
+ case t: DecimalType => stmt.setBigDecimal(i + 1, row.getDecimal(i))
+ case _ => throw new IllegalArgumentException(
+ s"Can't translate non-null value for field $i")
+ }
+ }
+ i = i + 1
+ }
+ stmt.executeUpdate()
+ }
+ } finally {
+ stmt.close()
+ }
+ conn.commit()
+ committed = true
+ } finally {
+ if (!committed) {
+ // The stage must fail. We got here through an exception path, so
+ // let the exception through unless rollback() or close() want to
+ // tell the user about another problem.
+ conn.rollback()
+ conn.close()
+ } else {
+ // The stage must succeed. We cannot propagate any exception close() might throw.
+ try {
+ conn.close()
+ } catch {
+ case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
+ }
+ }
+ }
+ Array[Byte]().iterator
+ }
+
+ /**
+ * Compute the schema string for this RDD.
+ */
+ def schemaString(df: DataFrame, url: String): String = {
+ val sb = new StringBuilder()
+ val dialect = JdbcDialects.get(url)
+ df.schema.fields foreach { field => {
+ val name = field.name
+ val typ: String =
+ dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
+ field.dataType match {
+ case IntegerType => "INTEGER"
+ case LongType => "BIGINT"
+ case DoubleType => "DOUBLE PRECISION"
+ case FloatType => "REAL"
+ case ShortType => "INTEGER"
+ case ByteType => "BYTE"
+ case BooleanType => "BIT(1)"
+ case StringType => "TEXT"
+ case BinaryType => "BLOB"
+ case TimestampType => "TIMESTAMP"
+ case DateType => "DATE"
+ case t: DecimalType => s"DECIMAL(${t.precision}},${t.scale}})"
+ case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
+ })
+ val nullable = if (field.nullable) "" else "NOT NULL"
+ sb.append(s", $name $typ $nullable")
+ }}
+ if (sb.length < 2) "" else sb.substring(2)
+ }
+
+ /**
+ * Saves the RDD to the database in a single transaction.
+ */
+ def saveTable(
+ df: DataFrame,
+ url: String,
+ table: String,
+ properties: Properties = new Properties()) {
+ val dialect = JdbcDialects.get(url)
+ val nullTypes: Array[Int] = df.schema.fields.map { field =>
+ dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
+ field.dataType match {
+ case IntegerType => java.sql.Types.INTEGER
+ case LongType => java.sql.Types.BIGINT
+ case DoubleType => java.sql.Types.DOUBLE
+ case FloatType => java.sql.Types.REAL
+ case ShortType => java.sql.Types.INTEGER
+ case ByteType => java.sql.Types.INTEGER
+ case BooleanType => java.sql.Types.BIT
+ case StringType => java.sql.Types.CLOB
+ case BinaryType => java.sql.Types.BLOB
+ case TimestampType => java.sql.Types.TIMESTAMP
+ case DateType => java.sql.Types.DATE
+ case t: DecimalType => java.sql.Types.DECIMAL
+ case _ => throw new IllegalArgumentException(
+ s"Can't translate null value for field $field")
+ })
+ }
+
+ val rddSchema = df.schema
+ val driver: String = DriverRegistry.getDriverClassName(url)
+ val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
+ df.foreachPartition { iterator =>
+ savePartition(getConnection, table, iterator, rddSchema, nullTypes)
+ }
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index ec5668c6b9..b6f3410bad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.sql.json
+package org.apache.spark.sql.execution.datasources.json
import com.fasterxml.jackson.core._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
-import org.apache.spark.sql.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
private[sql] object InferSchema {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 5bb9e62310..114c8b2118 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.json
+package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter
@@ -39,9 +39,10 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.util.SerializableConfiguration
-private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
- def format(): String = "json"
+class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
+
+ override def shortName(): String = "json"
override def createRelation(
sqlContext: SQLContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index d734e7e890..37c2b5a296 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.json
+package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.sql.catalyst.InternalRow
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index b8fd3b9cc1..cd68bd667c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.json
+package org.apache.spark.sql.execution.datasources.json
import java.io.ByteArrayOutputStream
@@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
index fde96852ce..005546f37d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.json
+package org.apache.spark.sql.execution.datasources.json
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 975fec101d..4049795ed3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.util.{Map => JMap}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
index 84f1dccfeb..ed9e0aa659 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 4fe8a39f20..3542dfbae1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index b12149dcf1..a3fc74cf79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import scala.collection.JavaConversions._
@@ -25,7 +25,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition._
import org.apache.parquet.schema._
-import org.apache.spark.sql.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, maxPrecisionForBytes}
+import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, maxPrecisionForBytes}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLConf}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index 1551afd7b7..2c6b914328 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
index 6ed3580af0..ccd7ebf319 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{MapData, ArrayData}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index d57b789f5c..9e2e232f50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.io.Serializable
import java.nio.ByteBuffer
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index b6db71b5b8..4086a139be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.net.URI
import java.util.logging.{Level, Logger => JLogger}
@@ -51,7 +51,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
- def format(): String = "parquet"
+ override def shortName(): String = "parquet"
override def createRelation(
sqlContext: SQLContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
index 9cd0250f9c..3191cf3d12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.math.BigInteger
import java.nio.{ByteBuffer, ByteOrder}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
index 3854f5bd39..019db34fc6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.io.IOException
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 3b907e5da7..1b51a5e5c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.metric
+package org.apache.spark.sql.execution.metric
import org.apache.spark.{Accumulable, AccumulableParam, SparkContext}
@@ -93,22 +93,6 @@ private[sql] class LongSQLMetric private[metric](name: String)
}
}
-/**
- * A specialized int Accumulable to avoid boxing and unboxing when using Accumulator's
- * `+=` and `add`.
- */
-private[sql] class IntSQLMetric private[metric](name: String)
- extends SQLMetric[IntSQLMetricValue, Int](name, IntSQLMetricParam) {
-
- override def +=(term: Int): Unit = {
- localValue.add(term)
- }
-
- override def add(term: Int): Unit = {
- localValue.add(term)
- }
-}
-
private object LongSQLMetricParam extends SQLMetricParam[LongSQLMetricValue, Long] {
override def addAccumulator(r: LongSQLMetricValue, t: Long): LongSQLMetricValue = r.add(t)
@@ -121,26 +105,8 @@ private object LongSQLMetricParam extends SQLMetricParam[LongSQLMetricValue, Lon
override def zero: LongSQLMetricValue = new LongSQLMetricValue(0L)
}
-private object IntSQLMetricParam extends SQLMetricParam[IntSQLMetricValue, Int] {
-
- override def addAccumulator(r: IntSQLMetricValue, t: Int): IntSQLMetricValue = r.add(t)
-
- override def addInPlace(r1: IntSQLMetricValue, r2: IntSQLMetricValue): IntSQLMetricValue =
- r1.add(r2.value)
-
- override def zero(initialValue: IntSQLMetricValue): IntSQLMetricValue = zero
-
- override def zero: IntSQLMetricValue = new IntSQLMetricValue(0)
-}
-
private[sql] object SQLMetrics {
- def createIntMetric(sc: SparkContext, name: String): IntSQLMetric = {
- val acc = new IntSQLMetric(name)
- sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
- acc
- }
-
def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = {
val acc = new LongSQLMetric(name)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala
index 66237f8f13..28fa231e72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala
@@ -18,12 +18,6 @@
package org.apache.spark.sql
/**
- * :: DeveloperApi ::
- * An execution engine for relational query plans that runs on top Spark and returns RDDs.
- *
- * Note that the operators in this package are created automatically by a query planner using a
- * [[SQLContext]] and are not intended to be used directly by end users of Spark SQL. They are
- * documented here in order to make it easier for others to understand the performance
- * characteristics of query plans that are generated by Spark SQL.
+ * The physical execution component of Spark SQL. Note that this is a private package.
*/
package object execution
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index cb7ca60b2f..49646a99d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.ui
+package org.apache.spark.sql.execution.ui
import javax.servlet.http.HttpServletRequest
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index 52ddf99e92..f0b56c2eb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.ui
+package org.apache.spark.sql.execution.ui
import javax.servlet.http.HttpServletRequest
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 2fd4fc658d..0b9bad987c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.ui
+package org.apache.spark.sql.execution.ui
import scala.collection.mutable
@@ -26,7 +26,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener with Logging {
@@ -51,17 +51,14 @@ private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener wit
private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
- @VisibleForTesting
def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
_executionIdToData.toMap
}
- @VisibleForTesting
def jobIdToExecutionId: Map[Long, Long] = synchronized {
_jobIdToExecutionId.toMap
}
- @VisibleForTesting
def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
_stageIdToStageMetrics.toMap
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 3bba0afaf1..0b0867f67e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.ui
+package org.apache.spark.sql.execution.ui
import java.util.concurrent.atomic.AtomicInteger
@@ -38,12 +38,12 @@ private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI)
private[sql] object SQLTab {
- private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/ui/static"
+ private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
private val nextTabId = new AtomicInteger(0)
private def nextTabName: String = {
val nextId = nextTabId.getAndIncrement()
- if (nextId == 0) "SQL" else s"SQL${nextId}"
+ if (nextId == 0) "SQL" else s"SQL$nextId"
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 1ba50b95be..ae3d752dde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.spark.sql.ui
+package org.apache.spark.sql.execution.ui
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
/**
* A graph used for storing information of an executionPlan of DataFrame.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala
deleted file mode 100644
index cc918c2371..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.jdbc
-
-import java.sql.{Connection, DriverManager}
-import java.util.Properties
-
-import scala.util.Try
-
-/**
- * Util functions for JDBC tables.
- */
-private[sql] object JdbcUtils {
-
- /**
- * Establishes a JDBC connection.
- */
- def createConnection(url: String, connectionProperties: Properties): Connection = {
- DriverManager.getConnection(url, connectionProperties)
- }
-
- /**
- * Returns true if the table already exists in the JDBC database.
- */
- def tableExists(conn: Connection, table: String): Boolean = {
- // Somewhat hacky, but there isn't a good way to identify whether a table exists for all
- // SQL database systems, considering "table" could also include the database name.
- Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess
- }
-
- /**
- * Drops a table from the JDBC database.
- */
- def dropTable(conn: Connection, table: String): Unit = {
- conn.prepareStatement(s"DROP TABLE $table").executeUpdate()
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
deleted file mode 100644
index 035e051008..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.sql.{Connection, Driver, DriverManager, DriverPropertyInfo, PreparedStatement, SQLFeatureNotSupportedException}
-import java.util.Properties
-
-import scala.collection.mutable
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-package object jdbc {
- private[sql] object JDBCWriteDetails extends Logging {
- /**
- * Returns a PreparedStatement that inserts a row into table via conn.
- */
- def insertStatement(conn: Connection, table: String, rddSchema: StructType):
- PreparedStatement = {
- val sql = new StringBuilder(s"INSERT INTO $table VALUES (")
- var fieldsLeft = rddSchema.fields.length
- while (fieldsLeft > 0) {
- sql.append("?")
- if (fieldsLeft > 1) sql.append(", ") else sql.append(")")
- fieldsLeft = fieldsLeft - 1
- }
- conn.prepareStatement(sql.toString)
- }
-
- /**
- * Saves a partition of a DataFrame to the JDBC database. This is done in
- * a single database transaction in order to avoid repeatedly inserting
- * data as much as possible.
- *
- * It is still theoretically possible for rows in a DataFrame to be
- * inserted into the database more than once if a stage somehow fails after
- * the commit occurs but before the stage can return successfully.
- *
- * This is not a closure inside saveTable() because apparently cosmetic
- * implementation changes elsewhere might easily render such a closure
- * non-Serializable. Instead, we explicitly close over all variables that
- * are used.
- */
- def savePartition(
- getConnection: () => Connection,
- table: String,
- iterator: Iterator[Row],
- rddSchema: StructType,
- nullTypes: Array[Int]): Iterator[Byte] = {
- val conn = getConnection()
- var committed = false
- try {
- conn.setAutoCommit(false) // Everything in the same db transaction.
- val stmt = insertStatement(conn, table, rddSchema)
- try {
- while (iterator.hasNext) {
- val row = iterator.next()
- val numFields = rddSchema.fields.length
- var i = 0
- while (i < numFields) {
- if (row.isNullAt(i)) {
- stmt.setNull(i + 1, nullTypes(i))
- } else {
- rddSchema.fields(i).dataType match {
- case IntegerType => stmt.setInt(i + 1, row.getInt(i))
- case LongType => stmt.setLong(i + 1, row.getLong(i))
- case DoubleType => stmt.setDouble(i + 1, row.getDouble(i))
- case FloatType => stmt.setFloat(i + 1, row.getFloat(i))
- case ShortType => stmt.setInt(i + 1, row.getShort(i))
- case ByteType => stmt.setInt(i + 1, row.getByte(i))
- case BooleanType => stmt.setBoolean(i + 1, row.getBoolean(i))
- case StringType => stmt.setString(i + 1, row.getString(i))
- case BinaryType => stmt.setBytes(i + 1, row.getAs[Array[Byte]](i))
- case TimestampType => stmt.setTimestamp(i + 1, row.getAs[java.sql.Timestamp](i))
- case DateType => stmt.setDate(i + 1, row.getAs[java.sql.Date](i))
- case t: DecimalType => stmt.setBigDecimal(i + 1, row.getDecimal(i))
- case _ => throw new IllegalArgumentException(
- s"Can't translate non-null value for field $i")
- }
- }
- i = i + 1
- }
- stmt.executeUpdate()
- }
- } finally {
- stmt.close()
- }
- conn.commit()
- committed = true
- } finally {
- if (!committed) {
- // The stage must fail. We got here through an exception path, so
- // let the exception through unless rollback() or close() want to
- // tell the user about another problem.
- conn.rollback()
- conn.close()
- } else {
- // The stage must succeed. We cannot propagate any exception close() might throw.
- try {
- conn.close()
- } catch {
- case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
- }
- }
- }
- Array[Byte]().iterator
- }
-
- /**
- * Compute the schema string for this RDD.
- */
- def schemaString(df: DataFrame, url: String): String = {
- val sb = new StringBuilder()
- val dialect = JdbcDialects.get(url)
- df.schema.fields foreach { field => {
- val name = field.name
- val typ: String =
- dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
- field.dataType match {
- case IntegerType => "INTEGER"
- case LongType => "BIGINT"
- case DoubleType => "DOUBLE PRECISION"
- case FloatType => "REAL"
- case ShortType => "INTEGER"
- case ByteType => "BYTE"
- case BooleanType => "BIT(1)"
- case StringType => "TEXT"
- case BinaryType => "BLOB"
- case TimestampType => "TIMESTAMP"
- case DateType => "DATE"
- case t: DecimalType => s"DECIMAL(${t.precision}},${t.scale}})"
- case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
- })
- val nullable = if (field.nullable) "" else "NOT NULL"
- sb.append(s", $name $typ $nullable")
- }}
- if (sb.length < 2) "" else sb.substring(2)
- }
-
- /**
- * Saves the RDD to the database in a single transaction.
- */
- def saveTable(
- df: DataFrame,
- url: String,
- table: String,
- properties: Properties = new Properties()) {
- val dialect = JdbcDialects.get(url)
- val nullTypes: Array[Int] = df.schema.fields.map { field =>
- dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
- field.dataType match {
- case IntegerType => java.sql.Types.INTEGER
- case LongType => java.sql.Types.BIGINT
- case DoubleType => java.sql.Types.DOUBLE
- case FloatType => java.sql.Types.REAL
- case ShortType => java.sql.Types.INTEGER
- case ByteType => java.sql.Types.INTEGER
- case BooleanType => java.sql.Types.BIT
- case StringType => java.sql.Types.CLOB
- case BinaryType => java.sql.Types.BLOB
- case TimestampType => java.sql.Types.TIMESTAMP
- case DateType => java.sql.Types.DATE
- case t: DecimalType => java.sql.Types.DECIMAL
- case _ => throw new IllegalArgumentException(
- s"Can't translate null value for field $field")
- })
- }
-
- val rddSchema = df.schema
- val driver: String = DriverRegistry.getDriverClassName(url)
- val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
- df.foreachPartition { iterator =>
- JDBCWriteDetails.savePartition(getConnection, table, iterator, rddSchema, nullTypes)
- }
- }
-
- }
-
- private [sql] class DriverWrapper(val wrapped: Driver) extends Driver {
- override def acceptsURL(url: String): Boolean = wrapped.acceptsURL(url)
-
- override def jdbcCompliant(): Boolean = wrapped.jdbcCompliant()
-
- override def getPropertyInfo(url: String, info: Properties): Array[DriverPropertyInfo] = {
- wrapped.getPropertyInfo(url, info)
- }
-
- override def getMinorVersion: Int = wrapped.getMinorVersion
-
- def getParentLogger: java.util.logging.Logger =
- throw new SQLFeatureNotSupportedException(
- s"${this.getClass().getName}.getParentLogger is not yet implemented.")
-
- override def connect(url: String, info: Properties): Connection = wrapped.connect(url, info)
-
- override def getMajorVersion: Int = wrapped.getMajorVersion
- }
-
- /**
- * java.sql.DriverManager is always loaded by bootstrap classloader,
- * so it can't load JDBC drivers accessible by Spark ClassLoader.
- *
- * To solve the problem, drivers from user-supplied jars are wrapped
- * into thin wrapper.
- */
- private [sql] object DriverRegistry extends Logging {
-
- private val wrapperMap: mutable.Map[String, DriverWrapper] = mutable.Map.empty
-
- def register(className: String): Unit = {
- val cls = Utils.getContextOrSparkClassLoader.loadClass(className)
- if (cls.getClassLoader == null) {
- logTrace(s"$className has been loaded with bootstrap ClassLoader, wrapper is not required")
- } else if (wrapperMap.get(className).isDefined) {
- logTrace(s"Wrapper for $className already exists")
- } else {
- synchronized {
- if (wrapperMap.get(className).isEmpty) {
- val wrapper = new DriverWrapper(cls.newInstance().asInstanceOf[Driver])
- DriverManager.registerDriver(wrapper)
- wrapperMap(className) = wrapper
- logTrace(s"Wrapper for $className registered")
- }
- }
- }
- }
-
- def getDriverClassName(url: String): String = DriverManager.getDriver(url) match {
- case wrapper: DriverWrapper => wrapper.wrapped.getClass.getCanonicalName
- case driver => driver.getClass.getCanonicalName
- }
- }
-
-} // package object jdbc
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6bcabbab4f..2f8417a48d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -43,19 +43,24 @@ import org.apache.spark.util.SerializableConfiguration
* This allows users to give the data source alias as the format type over the fully qualified
* class name.
*
- * ex: parquet.DefaultSource.format = "parquet".
- *
* A new instance of this class with be instantiated each time a DDL call is made.
+ *
+ * @since 1.5.0
*/
@DeveloperApi
trait DataSourceRegister {
/**
* The string that represents the format that this data source provider uses. This is
- * overridden by children to provide a nice alias for the data source,
- * ex: override def format(): String = "parquet"
+ * overridden by children to provide a nice alias for the data source. For example:
+ *
+ * {{{
+ * override def format(): String = "parquet"
+ * }}}
+ *
+ * @since 1.5.0
*/
- def format(): String
+ def shortName(): String
}
/**
diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/CompatibilityTest.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java
index daec65a5bb..70dec1a9d3 100644
--- a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/CompatibilityTest.java
+++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java
@@ -3,7 +3,7 @@
*
* DO NOT EDIT DIRECTLY
*/
-package org.apache.spark.sql.parquet.test.avro;
+package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
@@ -12,6 +12,6 @@ public interface CompatibilityTest {
@SuppressWarnings("all")
public interface Callback extends CompatibilityTest {
- public static final org.apache.avro.Protocol PROTOCOL = org.apache.spark.sql.parquet.test.avro.CompatibilityTest.PROTOCOL;
+ public static final org.apache.avro.Protocol PROTOCOL = org.apache.spark.sql.execution.datasources.parquet.test.avro.CompatibilityTest.PROTOCOL;
}
} \ No newline at end of file
diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/Nested.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Nested.java
index 051f1ee903..a0a406bcd1 100644
--- a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/Nested.java
+++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Nested.java
@@ -3,7 +3,7 @@
*
* DO NOT EDIT DIRECTLY
*/
-package org.apache.spark.sql.parquet.test.avro;
+package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Nested extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
@@ -77,18 +77,18 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Creates a new Nested RecordBuilder */
- public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder() {
- return new org.apache.spark.sql.parquet.test.avro.Nested.Builder();
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder newBuilder() {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder();
}
/** Creates a new Nested RecordBuilder by copying an existing Builder */
- public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.Nested.Builder other) {
- return new org.apache.spark.sql.parquet.test.avro.Nested.Builder(other);
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder other) {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder(other);
}
/** Creates a new Nested RecordBuilder by copying an existing Nested instance */
- public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.Nested other) {
- return new org.apache.spark.sql.parquet.test.avro.Nested.Builder(other);
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested other) {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder(other);
}
/**
@@ -102,11 +102,11 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
/** Creates a new Builder */
private Builder() {
- super(org.apache.spark.sql.parquet.test.avro.Nested.SCHEMA$);
+ super(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
- private Builder(org.apache.spark.sql.parquet.test.avro.Nested.Builder other) {
+ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder other) {
super(other);
if (isValidValue(fields()[0], other.nested_ints_column)) {
this.nested_ints_column = data().deepCopy(fields()[0].schema(), other.nested_ints_column);
@@ -119,8 +119,8 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Creates a Builder by copying an existing Nested instance */
- private Builder(org.apache.spark.sql.parquet.test.avro.Nested other) {
- super(org.apache.spark.sql.parquet.test.avro.Nested.SCHEMA$);
+ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested other) {
+ super(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.SCHEMA$);
if (isValidValue(fields()[0], other.nested_ints_column)) {
this.nested_ints_column = data().deepCopy(fields()[0].schema(), other.nested_ints_column);
fieldSetFlags()[0] = true;
@@ -137,7 +137,7 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Sets the value of the 'nested_ints_column' field */
- public org.apache.spark.sql.parquet.test.avro.Nested.Builder setNestedIntsColumn(java.util.List<java.lang.Integer> value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder setNestedIntsColumn(java.util.List<java.lang.Integer> value) {
validate(fields()[0], value);
this.nested_ints_column = value;
fieldSetFlags()[0] = true;
@@ -150,7 +150,7 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Clears the value of the 'nested_ints_column' field */
- public org.apache.spark.sql.parquet.test.avro.Nested.Builder clearNestedIntsColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder clearNestedIntsColumn() {
nested_ints_column = null;
fieldSetFlags()[0] = false;
return this;
@@ -162,7 +162,7 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Sets the value of the 'nested_string_column' field */
- public org.apache.spark.sql.parquet.test.avro.Nested.Builder setNestedStringColumn(java.lang.String value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder setNestedStringColumn(java.lang.String value) {
validate(fields()[1], value);
this.nested_string_column = value;
fieldSetFlags()[1] = true;
@@ -175,7 +175,7 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Clears the value of the 'nested_string_column' field */
- public org.apache.spark.sql.parquet.test.avro.Nested.Builder clearNestedStringColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder clearNestedStringColumn() {
nested_string_column = null;
fieldSetFlags()[1] = false;
return this;
diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetAvroCompat.java
index 354c9d73cc..6198b00b1e 100644
--- a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
+++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetAvroCompat.java
@@ -3,7 +3,7 @@
*
* DO NOT EDIT DIRECTLY
*/
-package org.apache.spark.sql.parquet.test.avro;
+package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
@@ -25,7 +25,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
@Deprecated public java.lang.String maybe_string_column;
@Deprecated public java.util.List<java.lang.String> strings_column;
@Deprecated public java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column;
- @Deprecated public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> complex_column;
+ @Deprecated public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> complex_column;
/**
* Default constructor. Note that this does not initialize fields
@@ -37,7 +37,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
/**
* All-args constructor.
*/
- public ParquetAvroCompat(java.lang.Boolean bool_column, java.lang.Integer int_column, java.lang.Long long_column, java.lang.Float float_column, java.lang.Double double_column, java.nio.ByteBuffer binary_column, java.lang.String string_column, java.lang.Boolean maybe_bool_column, java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column, java.util.List<java.lang.String> strings_column, java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column, java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> complex_column) {
+ public ParquetAvroCompat(java.lang.Boolean bool_column, java.lang.Integer int_column, java.lang.Long long_column, java.lang.Float float_column, java.lang.Double double_column, java.nio.ByteBuffer binary_column, java.lang.String string_column, java.lang.Boolean maybe_bool_column, java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column, java.util.List<java.lang.String> strings_column, java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column, java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> complex_column) {
this.bool_column = bool_column;
this.int_column = int_column;
this.long_column = long_column;
@@ -101,7 +101,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
case 13: maybe_string_column = (java.lang.String)value$; break;
case 14: strings_column = (java.util.List<java.lang.String>)value$; break;
case 15: string_to_int_column = (java.util.Map<java.lang.String,java.lang.Integer>)value$; break;
- case 16: complex_column = (java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>)value$; break;
+ case 16: complex_column = (java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>>)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
@@ -349,7 +349,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
/**
* Gets the value of the 'complex_column' field.
*/
- public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> getComplexColumn() {
+ public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> getComplexColumn() {
return complex_column;
}
@@ -357,23 +357,23 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
* Sets the value of the 'complex_column' field.
* @param value the value to set.
*/
- public void setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> value) {
+ public void setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> value) {
this.complex_column = value;
}
/** Creates a new ParquetAvroCompat RecordBuilder */
- public static org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder newBuilder() {
- return new org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder();
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder newBuilder() {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder();
}
/** Creates a new ParquetAvroCompat RecordBuilder by copying an existing Builder */
- public static org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder other) {
- return new org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder(other);
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder other) {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder(other);
}
/** Creates a new ParquetAvroCompat RecordBuilder by copying an existing ParquetAvroCompat instance */
- public static org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat other) {
- return new org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder(other);
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat other) {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder(other);
}
/**
@@ -398,15 +398,15 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
private java.lang.String maybe_string_column;
private java.util.List<java.lang.String> strings_column;
private java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column;
- private java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> complex_column;
+ private java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> complex_column;
/** Creates a new Builder */
private Builder() {
- super(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
+ super(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
- private Builder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder other) {
+ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder other) {
super(other);
if (isValidValue(fields()[0], other.bool_column)) {
this.bool_column = data().deepCopy(fields()[0].schema(), other.bool_column);
@@ -479,8 +479,8 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Creates a Builder by copying an existing ParquetAvroCompat instance */
- private Builder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat other) {
- super(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
+ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat other) {
+ super(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
if (isValidValue(fields()[0], other.bool_column)) {
this.bool_column = data().deepCopy(fields()[0].schema(), other.bool_column);
fieldSetFlags()[0] = true;
@@ -557,7 +557,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'bool_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setBoolColumn(boolean value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setBoolColumn(boolean value) {
validate(fields()[0], value);
this.bool_column = value;
fieldSetFlags()[0] = true;
@@ -570,7 +570,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'bool_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearBoolColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearBoolColumn() {
fieldSetFlags()[0] = false;
return this;
}
@@ -581,7 +581,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'int_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setIntColumn(int value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setIntColumn(int value) {
validate(fields()[1], value);
this.int_column = value;
fieldSetFlags()[1] = true;
@@ -594,7 +594,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'int_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearIntColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearIntColumn() {
fieldSetFlags()[1] = false;
return this;
}
@@ -605,7 +605,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'long_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setLongColumn(long value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setLongColumn(long value) {
validate(fields()[2], value);
this.long_column = value;
fieldSetFlags()[2] = true;
@@ -618,7 +618,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'long_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearLongColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearLongColumn() {
fieldSetFlags()[2] = false;
return this;
}
@@ -629,7 +629,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'float_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setFloatColumn(float value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setFloatColumn(float value) {
validate(fields()[3], value);
this.float_column = value;
fieldSetFlags()[3] = true;
@@ -642,7 +642,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'float_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearFloatColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearFloatColumn() {
fieldSetFlags()[3] = false;
return this;
}
@@ -653,7 +653,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'double_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setDoubleColumn(double value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setDoubleColumn(double value) {
validate(fields()[4], value);
this.double_column = value;
fieldSetFlags()[4] = true;
@@ -666,7 +666,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'double_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearDoubleColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearDoubleColumn() {
fieldSetFlags()[4] = false;
return this;
}
@@ -677,7 +677,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'binary_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setBinaryColumn(java.nio.ByteBuffer value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setBinaryColumn(java.nio.ByteBuffer value) {
validate(fields()[5], value);
this.binary_column = value;
fieldSetFlags()[5] = true;
@@ -690,7 +690,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'binary_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearBinaryColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearBinaryColumn() {
binary_column = null;
fieldSetFlags()[5] = false;
return this;
@@ -702,7 +702,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'string_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setStringColumn(java.lang.String value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setStringColumn(java.lang.String value) {
validate(fields()[6], value);
this.string_column = value;
fieldSetFlags()[6] = true;
@@ -715,7 +715,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'string_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearStringColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearStringColumn() {
string_column = null;
fieldSetFlags()[6] = false;
return this;
@@ -727,7 +727,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_bool_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeBoolColumn(java.lang.Boolean value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeBoolColumn(java.lang.Boolean value) {
validate(fields()[7], value);
this.maybe_bool_column = value;
fieldSetFlags()[7] = true;
@@ -740,7 +740,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_bool_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeBoolColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeBoolColumn() {
maybe_bool_column = null;
fieldSetFlags()[7] = false;
return this;
@@ -752,7 +752,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_int_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeIntColumn(java.lang.Integer value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeIntColumn(java.lang.Integer value) {
validate(fields()[8], value);
this.maybe_int_column = value;
fieldSetFlags()[8] = true;
@@ -765,7 +765,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_int_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeIntColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeIntColumn() {
maybe_int_column = null;
fieldSetFlags()[8] = false;
return this;
@@ -777,7 +777,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_long_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeLongColumn(java.lang.Long value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeLongColumn(java.lang.Long value) {
validate(fields()[9], value);
this.maybe_long_column = value;
fieldSetFlags()[9] = true;
@@ -790,7 +790,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_long_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeLongColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeLongColumn() {
maybe_long_column = null;
fieldSetFlags()[9] = false;
return this;
@@ -802,7 +802,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_float_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeFloatColumn(java.lang.Float value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeFloatColumn(java.lang.Float value) {
validate(fields()[10], value);
this.maybe_float_column = value;
fieldSetFlags()[10] = true;
@@ -815,7 +815,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_float_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeFloatColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeFloatColumn() {
maybe_float_column = null;
fieldSetFlags()[10] = false;
return this;
@@ -827,7 +827,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_double_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeDoubleColumn(java.lang.Double value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeDoubleColumn(java.lang.Double value) {
validate(fields()[11], value);
this.maybe_double_column = value;
fieldSetFlags()[11] = true;
@@ -840,7 +840,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_double_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeDoubleColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeDoubleColumn() {
maybe_double_column = null;
fieldSetFlags()[11] = false;
return this;
@@ -852,7 +852,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_binary_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeBinaryColumn(java.nio.ByteBuffer value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeBinaryColumn(java.nio.ByteBuffer value) {
validate(fields()[12], value);
this.maybe_binary_column = value;
fieldSetFlags()[12] = true;
@@ -865,7 +865,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_binary_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeBinaryColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeBinaryColumn() {
maybe_binary_column = null;
fieldSetFlags()[12] = false;
return this;
@@ -877,7 +877,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_string_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeStringColumn(java.lang.String value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeStringColumn(java.lang.String value) {
validate(fields()[13], value);
this.maybe_string_column = value;
fieldSetFlags()[13] = true;
@@ -890,7 +890,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_string_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeStringColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeStringColumn() {
maybe_string_column = null;
fieldSetFlags()[13] = false;
return this;
@@ -902,7 +902,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'strings_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setStringsColumn(java.util.List<java.lang.String> value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setStringsColumn(java.util.List<java.lang.String> value) {
validate(fields()[14], value);
this.strings_column = value;
fieldSetFlags()[14] = true;
@@ -915,7 +915,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'strings_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearStringsColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearStringsColumn() {
strings_column = null;
fieldSetFlags()[14] = false;
return this;
@@ -927,7 +927,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'string_to_int_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setStringToIntColumn(java.util.Map<java.lang.String,java.lang.Integer> value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setStringToIntColumn(java.util.Map<java.lang.String,java.lang.Integer> value) {
validate(fields()[15], value);
this.string_to_int_column = value;
fieldSetFlags()[15] = true;
@@ -940,19 +940,19 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'string_to_int_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearStringToIntColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearStringToIntColumn() {
string_to_int_column = null;
fieldSetFlags()[15] = false;
return this;
}
/** Gets the value of the 'complex_column' field */
- public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> getComplexColumn() {
+ public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> getComplexColumn() {
return complex_column;
}
/** Sets the value of the 'complex_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> value) {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> value) {
validate(fields()[16], value);
this.complex_column = value;
fieldSetFlags()[16] = true;
@@ -965,7 +965,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'complex_column' field */
- public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearComplexColumn() {
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearComplexColumn() {
complex_column = null;
fieldSetFlags()[16] = false;
return this;
@@ -991,7 +991,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
record.maybe_string_column = fieldSetFlags()[13] ? this.maybe_string_column : (java.lang.String) defaultValue(fields()[13]);
record.strings_column = fieldSetFlags()[14] ? this.strings_column : (java.util.List<java.lang.String>) defaultValue(fields()[14]);
record.string_to_int_column = fieldSetFlags()[15] ? this.string_to_int_column : (java.util.Map<java.lang.String,java.lang.Integer>) defaultValue(fields()[15]);
- record.complex_column = fieldSetFlags()[16] ? this.complex_column : (java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>) defaultValue(fields()[16]);
+ record.complex_column = fieldSetFlags()[16] ? this.complex_column : (java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>>) defaultValue(fields()[16]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c49f256be5..adbd95197d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -25,8 +25,8 @@ import scala.util.Random
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.json.JSONRelation
-import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 92022ff23d..73d5621897 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.json
+package org.apache.spark.sql.execution.datasources.json
import java.io.{File, StringWriter}
import java.sql.{Date, Timestamp}
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{SQLContext, QueryTest, Row, SQLConf}
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
-import org.apache.spark.sql.json.InferSchema.compatibleType
+import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
import org.apache.spark.sql.types._
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.Utils
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index 369df56530..6b62c9a003 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.json
+package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
index bfa427349f..4d9c07bb7a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.nio.ByteBuffer
import java.util.{List => JList, Map => JMap}
@@ -25,7 +25,7 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.spark.sql.parquet.test.avro.{Nested, ParquetAvroCompat}
+import org.apache.spark.sql.execution.datasources.parquet.test.avro.{Nested, ParquetAvroCompat}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{Row, SQLContext}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
index 57478931cd..68f35b1f3a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import scala.collection.JavaConversions._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index b6a7c4fbdd..7dd9680d8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.filter2.predicate.Operators._
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index b415da5b8c..ee925afe08 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -373,7 +373,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
// _temporary should be missing if direct output committer works.
try {
configuration.set("spark.sql.parquet.output.committer.class",
- "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
+ classOf[DirectParquetOutputCommitter].getCanonicalName)
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 2eef10189f..73152de244 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import java.math.BigInteger
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 5c65a8ec57..5e6d9c1cd4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 4a0b3b60f4..8f06de7ce7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 64e94056f2..3c6e54db4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala
index 1c532d7879..92b1d82217 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.parquet
+package org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{Row, SQLContext}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index d22160f538..953284c98b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.metric
+package org.apache.spark.sql.execution.metric
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
@@ -41,16 +41,6 @@ class SQLMetricsSuite extends SparkFunSuite {
}
}
- test("IntSQLMetric should not box Int") {
- val l = SQLMetrics.createIntMetric(TestSQLContext.sparkContext, "Int")
- val f = () => { l += 1 }
- BoxingFinder.getClassReader(f.getClass).foreach { cl =>
- val boxingFinder = new BoxingFinder()
- cl.accept(boxingFinder, 0)
- assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}")
- }
- }
-
test("Normal accumulator should do boxing") {
// We need this test to make sure BoxingFinder works.
val l = TestSQLContext.sparkContext.accumulator(0L)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 69a561e16a..41dd1896c1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.sql.ui
+package org.apache.spark.sql.execution.ui
import java.util.Properties
import org.apache.spark.{SparkException, SparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.sql.metric.LongSQLMetricValue
+import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.SQLExecution
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 1907e643c8..562c279067 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -51,7 +51,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -75,7 +75,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -92,7 +92,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -107,7 +107,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -122,7 +122,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -139,7 +139,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -158,7 +158,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -175,7 +175,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable (a int, b string)
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -188,7 +188,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@@ -199,7 +199,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
- |USING org.apache.spark.sql.json.DefaultSource
+ |USING json
|OPTIONS (
| path '${path.toString}'
|) AS
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index 1a4d41b02c..392da0b082 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -20,9 +20,37 @@ package org.apache.spark.sql.sources
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+// please note that the META-INF/services had to be modified for the test directory for this to work
+class DDLSourceLoadSuite extends DataSourceTest {
+
+ test("data sources with the same name") {
+ intercept[RuntimeException] {
+ caseInsensitiveContext.read.format("Fluet da Bomb").load()
+ }
+ }
+
+ test("load data source from format alias") {
+ caseInsensitiveContext.read.format("gathering quorum").load().schema ==
+ StructType(Seq(StructField("stringType", StringType, nullable = false)))
+ }
+
+ test("specify full classname with duplicate formats") {
+ caseInsensitiveContext.read.format("org.apache.spark.sql.sources.FakeSourceOne")
+ .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))
+ }
+
+ test("should fail to load ORC without HiveContext") {
+ intercept[ClassNotFoundException] {
+ caseInsensitiveContext.read.format("orc").load()
+ }
+ }
+}
+
+
class FakeSourceOne extends RelationProvider with DataSourceRegister {
- def format(): String = "Fluet da Bomb"
+ def shortName(): String = "Fluet da Bomb"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
@@ -35,7 +63,7 @@ class FakeSourceOne extends RelationProvider with DataSourceRegister {
class FakeSourceTwo extends RelationProvider with DataSourceRegister {
- def format(): String = "Fluet da Bomb"
+ def shortName(): String = "Fluet da Bomb"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
@@ -48,7 +76,7 @@ class FakeSourceTwo extends RelationProvider with DataSourceRegister {
class FakeSourceThree extends RelationProvider with DataSourceRegister {
- def format(): String = "gathering quorum"
+ def shortName(): String = "gathering quorum"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
@@ -58,28 +86,3 @@ class FakeSourceThree extends RelationProvider with DataSourceRegister {
StructType(Seq(StructField("stringType", StringType, nullable = false)))
}
}
-// please note that the META-INF/services had to be modified for the test directory for this to work
-class DDLSourceLoadSuite extends DataSourceTest {
-
- test("data sources with the same name") {
- intercept[RuntimeException] {
- caseInsensitiveContext.read.format("Fluet da Bomb").load()
- }
- }
-
- test("load data source from format alias") {
- caseInsensitiveContext.read.format("gathering quorum").load().schema ==
- StructType(Seq(StructField("stringType", StringType, nullable = false)))
- }
-
- test("specify full classname with duplicate formats") {
- caseInsensitiveContext.read.format("org.apache.spark.sql.sources.FakeSourceOne")
- .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))
- }
-
- test("Loading Orc") {
- intercept[ClassNotFoundException] {
- caseInsensitiveContext.read.format("orc").load()
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 3cbf5467b2..27d1cd92fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -22,14 +22,39 @@ import org.apache.spark.sql.execution.datasources.ResolvedDataSource
class ResolvedDataSourceSuite extends SparkFunSuite {
- test("builtin sources") {
- assert(ResolvedDataSource.lookupDataSource("jdbc") ===
- classOf[org.apache.spark.sql.jdbc.DefaultSource])
+ test("jdbc") {
+ assert(
+ ResolvedDataSource.lookupDataSource("jdbc") ===
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ assert(
+ ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.jdbc") ===
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ assert(
+ ResolvedDataSource.lookupDataSource("org.apache.spark.sql.jdbc") ===
+ classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+ }
- assert(ResolvedDataSource.lookupDataSource("json") ===
- classOf[org.apache.spark.sql.json.DefaultSource])
+ test("json") {
+ assert(
+ ResolvedDataSource.lookupDataSource("json") ===
+ classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ assert(
+ ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.json") ===
+ classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ assert(
+ ResolvedDataSource.lookupDataSource("org.apache.spark.sql.json") ===
+ classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
+ }
- assert(ResolvedDataSource.lookupDataSource("parquet") ===
- classOf[org.apache.spark.sql.parquet.DefaultSource])
+ test("parquet") {
+ assert(
+ ResolvedDataSource.lookupDataSource("parquet") ===
+ classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
+ assert(
+ ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.parquet") ===
+ classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
+ assert(
+ ResolvedDataSource.lookupDataSource("org.apache.spark.sql.parquet") ===
+ classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 7198a32df4..ac9aaed19d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 0c344c63fd..9f4f8b5789 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
-import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
@@ -49,9 +48,9 @@ import scala.collection.JavaConversions._
private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
- def format(): String = "orc"
+ override def shortName(): String = "orc"
- def createRelation(
+ override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
dataSchema: Option[StructType],
@@ -144,7 +143,6 @@ private[orc] class OrcOutputWriter(
}
}
-@DeveloperApi
private[sql] class OrcRelation(
override val paths: Array[String],
maybeDataSchema: Option[StructType],
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index a45c2d9572..1fa005d5f9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.parquet.ParquetTest
+import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
import org.apache.spark.sql.{QueryTest, Row}
case class Cases(lower: String, UPPER: String)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b73d666575..7f36a483a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index f00d3754c3..80eb9f122a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.parquet.ParquetCompatibilityTest
+import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 2fa7ae3fa2..79a136ae6f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
-import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index c4bc60086f..50f02432da 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index d280543a07..cb4cedddbf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -23,12 +23,12 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{AnalysisException, SaveMode, parquet}
+import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
- override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
+ override val dataSourceName: String = "parquet"
import sqlContext._
import sqlContext.implicits._
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 1813cc3322..48c37a1fa1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -53,7 +53,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String =
- classOf[org.apache.spark.sql.json.DefaultSource].getCanonicalName
+ classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource].getCanonicalName
import sqlContext._