aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorZhan Zhang <zhazhan@gmail.com>2015-05-18 12:03:27 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-18 12:03:40 -0700
commitaa31e431fc09f0477f1c2351c6275769a31aca90 (patch)
tree58ec159706a2dc7703b1eeba5d466b92a20e3147 /sql/core
parent9c7e802a5a2b8cd3eb77642f84c54a8e976fc996 (diff)
downloadspark-aa31e431fc09f0477f1c2351c6275769a31aca90.tar.gz
spark-aa31e431fc09f0477f1c2351c6275769a31aca90.tar.bz2
spark-aa31e431fc09f0477f1c2351c6275769a31aca90.zip
[SPARK-2883] [SQL] ORC data source for Spark SQL
This PR updates PR #6135 authored by zhzhan from Hortonworks. ---- This PR implements a Spark SQL data source for accessing ORC files. > **NOTE** > > Although ORC is now an Apache TLP, the codebase is still tightly coupled with Hive. That's why the new ORC data source is under `org.apache.spark.sql.hive` package, and must be used with `HiveContext`. However, it doesn't require existing Hive installation to access ORC files. 1. Saving/loading ORC files without contacting Hive metastore 1. Support for complex data types (i.e. array, map, and struct) 1. Aware of common optimizations provided by Spark SQL: - Column pruning - Partitioning pruning - Filter push-down 1. Schema evolution support 1. Hive metastore table conversion This PR also include initial work done by scwf from Huawei (PR #3753). Author: Zhan Zhang <zhazhan@gmail.com> Author: Cheng Lian <lian@databricks.com> Closes #6194 from liancheng/polishing-orc and squashes the following commits: 55ecd96 [Cheng Lian] Reorganizes ORC test suites d4afeed [Cheng Lian] Addresses comments 21ada22 [Cheng Lian] Adds @since and @Experimental annotations 128bd3b [Cheng Lian] ORC filter bug fix d734496 [Cheng Lian] Polishes the ORC data source 2650a42 [Zhan Zhang] resolve review comments 3c9038e [Zhan Zhang] resolve review comments 7b3c7c5 [Zhan Zhang] save mode fix f95abfd [Zhan Zhang] reuse test suite 7cc2c64 [Zhan Zhang] predicate fix 4e61c16 [Zhan Zhang] minor change 305418c [Zhan Zhang] orc data source support
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala81
4 files changed, 103 insertions, 64 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f07bb196c1..6da910e332 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -43,6 +43,8 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
+ val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"
+
val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
@@ -143,6 +145,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def parquetUseDataSourceApi =
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
+ private[spark] def orcFilterPushDown =
+ getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+
/** When true uses verifyPartitionPath to prune the path which is not exists. */
private[spark] def verifyPartitionPath =
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
@@ -254,7 +259,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def dataFrameRetainGroupColumns: Boolean =
getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean
-
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index 7a73b6f1ac..516ba373f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -21,10 +21,9 @@ import java.io.File
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-import scala.util.Try
-import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.{DataFrame, SaveMode}
/**
* A helper trait that provides convenient facilities for Parquet testing.
@@ -33,54 +32,9 @@ import org.apache.spark.util.Utils
* convenient to use tuples rather than special case classes when writing test cases/suites.
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
*/
-private[sql] trait ParquetTest {
- val sqlContext: SQLContext
-
+private[sql] trait ParquetTest extends SQLTestUtils {
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
- import sqlContext.{conf, sparkContext}
-
- protected def configuration = sparkContext.hadoopConfiguration
-
- /**
- * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
- * configurations.
- *
- * @todo Probably this method should be moved to a more general place
- */
- protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
- val (keys, values) = pairs.unzip
- val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
- (keys, values).zipped.foreach(conf.setConf)
- try f finally {
- keys.zip(currentValues).foreach {
- case (key, Some(value)) => conf.setConf(key, value)
- case (key, None) => conf.unsetConf(key)
- }
- }
- }
-
- /**
- * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
- * a file/directory is created there by `f`, it will be delete after `f` returns.
- *
- * @todo Probably this method should be moved to a more general place
- */
- protected def withTempPath(f: File => Unit): Unit = {
- val path = Utils.createTempDir()
- path.delete()
- try f(path) finally Utils.deleteRecursively(path)
- }
-
- /**
- * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
- * returns.
- *
- * @todo Probably this method should be moved to a more general place
- */
- protected def withTempDir(f: File => Unit): Unit = {
- val dir = Utils.createTempDir().getCanonicalFile
- try f(dir) finally Utils.deleteRecursively(dir)
- }
+ import sqlContext.sparkContext
/**
* Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
@@ -106,13 +60,6 @@ private[sql] trait ParquetTest {
}
/**
- * Drops temporary table `tableName` after calling `f`.
- */
- protected def withTempTable(tableName: String)(f: => Unit): Unit = {
- try f finally sqlContext.dropTempTable(tableName)
- }
-
- /**
* Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a
* temporary table named `tableName`, then call `f`. The temporary table together with the
* Parquet file will be dropped/deleted after `f` returns.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 37a569db31..a13ab74852 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -188,18 +188,20 @@ private[sql] class DDLParser(
private[sql] object ResolvedDataSource {
private val builtinSources = Map(
- "jdbc" -> classOf[org.apache.spark.sql.jdbc.DefaultSource],
- "json" -> classOf[org.apache.spark.sql.json.DefaultSource],
- "parquet" -> classOf[org.apache.spark.sql.parquet.DefaultSource]
+ "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
+ "json" -> "org.apache.spark.sql.json.DefaultSource",
+ "parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
+ "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
)
/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
+ val loader = Utils.getContextOrSparkClassLoader
+
if (builtinSources.contains(provider)) {
- return builtinSources(provider)
+ return loader.loadClass(builtinSources(provider))
}
- val loader = Utils.getContextOrSparkClassLoader
try {
loader.loadClass(provider)
} catch {
@@ -208,7 +210,11 @@ private[sql] object ResolvedDataSource {
loader.loadClass(provider + ".DefaultSource")
} catch {
case cnf: java.lang.ClassNotFoundException =>
- sys.error(s"Failed to load class for data source: $provider")
+ if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+ sys.error("The ORC data source must be used with Hive support enabled.")
+ } else {
+ sys.error(s"Failed to load class for data source: $provider")
+ }
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
new file mode 100644
index 0000000000..75d290625e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.test
+
+import java.io.File
+
+import scala.util.Try
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.util.Utils
+
+trait SQLTestUtils {
+ val sqlContext: SQLContext
+
+ import sqlContext.{conf, sparkContext}
+
+ protected def configuration = sparkContext.hadoopConfiguration
+
+ /**
+ * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
+ * configurations.
+ *
+ * @todo Probably this method should be moved to a more general place
+ */
+ protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ val (keys, values) = pairs.unzip
+ val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
+ (keys, values).zipped.foreach(conf.setConf)
+ try f finally {
+ keys.zip(currentValues).foreach {
+ case (key, Some(value)) => conf.setConf(key, value)
+ case (key, None) => conf.unsetConf(key)
+ }
+ }
+ }
+
+ /**
+ * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
+ * a file/directory is created there by `f`, it will be delete after `f` returns.
+ *
+ * @todo Probably this method should be moved to a more general place
+ */
+ protected def withTempPath(f: File => Unit): Unit = {
+ val path = Utils.createTempDir()
+ path.delete()
+ try f(path) finally Utils.deleteRecursively(path)
+ }
+
+ /**
+ * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+ * returns.
+ *
+ * @todo Probably this method should be moved to a more general place
+ */
+ protected def withTempDir(f: File => Unit): Unit = {
+ val dir = Utils.createTempDir().getCanonicalFile
+ try f(dir) finally Utils.deleteRecursively(dir)
+ }
+
+ /**
+ * Drops temporary table `tableName` after calling `f`.
+ */
+ protected def withTempTable(tableName: String)(f: => Unit): Unit = {
+ try f finally sqlContext.dropTempTable(tableName)
+ }
+}