aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorJoseph Batchik <joseph.batchik@cloudera.com>2015-08-08 11:03:01 -0700
committerReynold Xin <rxin@databricks.com>2015-08-08 11:03:01 -0700
commita3aec918bed22f8e33cf91dc0d6e712e6653c7d2 (patch)
tree6c8bf644c083f7e7f0ede49873debb45d805cb5d /sql/core/src/main
parent23695f1d2d7ef9f3ea92cebcd96b1cf0e8904eb4 (diff)
downloadspark-a3aec918bed22f8e33cf91dc0d6e712e6653c7d2.tar.gz
spark-a3aec918bed22f8e33cf91dc0d6e712e6653c7d2.tar.bz2
spark-a3aec918bed22f8e33cf91dc0d6e712e6653c7d2.zip
[SPARK-9486][SQL] Add data source aliasing for external packages
Users currently have to provide the full class name for external data sources, like: `sqlContext.read.format("com.databricks.spark.avro").load(path)` This allows external data source packages to register themselves using a Service Loader so that they can add custom alias like: `sqlContext.read.format("avro").load(path)` This makes it so that using external data source packages uses the same format as the internal data sources like parquet, json, etc. Author: Joseph Batchik <joseph.batchik@cloudera.com> Author: Joseph Batchik <josephbatchik@gmail.com> Closes #7802 from JDrit/service_loader and squashes the following commits: 49a01ec [Joseph Batchik] fixed a couple of format / error bugs e5e93b2 [Joseph Batchik] modified rat file to only excluded added services 72b349a [Joseph Batchik] fixed error with orc data source actually 9f93ea7 [Joseph Batchik] fixed error with orc data source 87b7f1c [Joseph Batchik] fixed typo 101cd22 [Joseph Batchik] removing unneeded changes 8f3cf43 [Joseph Batchik] merged in changes b63d337 [Joseph Batchik] merged in master 95ae030 [Joseph Batchik] changed the new trait to be used as a mixin for data source to register themselves 74db85e [Joseph Batchik] reformatted class loader ac2270d [Joseph Batchik] removing some added test a6926db [Joseph Batchik] added test cases for data source loader 208a2a8 [Joseph Batchik] changes to do error catching if there are multiple data sources 946186e [Joseph Batchik] started working on service loader
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala52
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala21
6 files changed, 62 insertions, 29 deletions
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000000..cc32d4b727
--- /dev/null
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,3 @@
+org.apache.spark.sql.jdbc.DefaultSource
+org.apache.spark.sql.json.DefaultSource
+org.apache.spark.sql.parquet.DefaultSource
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 0cdb407ad5..8c2f297e42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -17,7 +17,12 @@
package org.apache.spark.sql.execution.datasources
+import java.util.ServiceLoader
+
+import scala.collection.Iterator
+import scala.collection.JavaConversions._
import scala.language.{existentials, implicitConversions}
+import scala.util.{Failure, Success, Try}
import scala.util.matching.Regex
import org.apache.hadoop.fs.Path
@@ -190,37 +195,32 @@ private[sql] class DDLParser(
}
}
-private[sql] object ResolvedDataSource {
-
- private val builtinSources = Map(
- "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
- "json" -> "org.apache.spark.sql.json.DefaultSource",
- "parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
- "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
- )
+private[sql] object ResolvedDataSource extends Logging {
/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
+ val provider2 = s"$provider.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
-
- if (builtinSources.contains(provider)) {
- return loader.loadClass(builtinSources(provider))
- }
-
- try {
- loader.loadClass(provider)
- } catch {
- case cnf: java.lang.ClassNotFoundException =>
- try {
- loader.loadClass(provider + ".DefaultSource")
- } catch {
- case cnf: java.lang.ClassNotFoundException =>
- if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
- sys.error("The ORC data source must be used with Hive support enabled.")
- } else {
- sys.error(s"Failed to load class for data source: $provider")
- }
+ val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+ serviceLoader.iterator().filter(_.format().equalsIgnoreCase(provider)).toList match {
+ /** the provider format did not match any given registered aliases */
+ case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
+ case Success(dataSource) => dataSource
+ case Failure(error) => if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+ throw new ClassNotFoundException(
+ "The ORC data source must be used with Hive support enabled.", error)
+ } else {
+ throw new ClassNotFoundException(
+ s"Failed to load class for data source: $provider", error)
}
+ }
+ /** there is exactly one registered alias */
+ case head :: Nil => head.getClass
+ /** There are multiple registered aliases for the input */
+ case sources => sys.error(s"Multiple sources found for $provider, " +
+ s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+ "please specify the fully qualified class name")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index 41d0ecb4bb..48d97ced9c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -77,7 +77,10 @@ private[sql] object JDBCRelation {
}
}
-private[sql] class DefaultSource extends RelationProvider {
+private[sql] class DefaultSource extends RelationProvider with DataSourceRegister {
+
+ def format(): String = "jdbc"
+
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 10f1367e69..b34a272ec5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -37,7 +37,10 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
-private[sql] class DefaultSource extends HadoopFsRelationProvider {
+private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
+
+ def format(): String = "json"
+
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 48009b2fd0..b6db71b5b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -49,7 +49,10 @@ import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
-private[sql] class DefaultSource extends HadoopFsRelationProvider {
+private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
+
+ def format(): String = "parquet"
+
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index c5b7ee73eb..4aafec0e2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -39,6 +39,27 @@ import org.apache.spark.util.SerializableConfiguration
/**
* ::DeveloperApi::
+ * Data sources should implement this trait so that they can register an alias to their data source.
+ * This allows users to give the data source alias as the format type over the fully qualified
+ * class name.
+ *
+ * ex: parquet.DefaultSource.format = "parquet".
+ *
+ * A new instance of this class with be instantiated each time a DDL call is made.
+ */
+@DeveloperApi
+trait DataSourceRegister {
+
+ /**
+ * The string that represents the format that this data source provider uses. This is
+ * overridden by children to provide a nice alias for the data source,
+ * ex: override def format(): String = "parquet"
+ */
+ def format(): String
+}
+
+/**
+ * ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented
* RelationProvider), this interface is used to pass in the parameters specified by a user.