From a3aec918bed22f8e33cf91dc0d6e712e6653c7d2 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Sat, 8 Aug 2015 11:03:01 -0700 Subject: [SPARK-9486][SQL] Add data source aliasing for external packages Users currently have to provide the full class name for external data sources, like: `sqlContext.read.format("com.databricks.spark.avro").load(path)` This allows external data source packages to register themselves using a Service Loader so that they can add custom alias like: `sqlContext.read.format("avro").load(path)` This makes it so that using external data source packages uses the same format as the internal data sources like parquet, json, etc. Author: Joseph Batchik Author: Joseph Batchik Closes #7802 from JDrit/service_loader and squashes the following commits: 49a01ec [Joseph Batchik] fixed a couple of format / error bugs e5e93b2 [Joseph Batchik] modified rat file to only excluded added services 72b349a [Joseph Batchik] fixed error with orc data source actually 9f93ea7 [Joseph Batchik] fixed error with orc data source 87b7f1c [Joseph Batchik] fixed typo 101cd22 [Joseph Batchik] removing unneeded changes 8f3cf43 [Joseph Batchik] merged in changes b63d337 [Joseph Batchik] merged in master 95ae030 [Joseph Batchik] changed the new trait to be used as a mixin for data source to register themselves 74db85e [Joseph Batchik] reformatted class loader ac2270d [Joseph Batchik] removing some added test a6926db [Joseph Batchik] added test cases for data source loader 208a2a8 [Joseph Batchik] changes to do error catching if there are multiple data sources 946186e [Joseph Batchik] started working on service loader --- .rat-excludes | 1 + ...org.apache.spark.sql.sources.DataSourceRegister | 3 + .../spark/sql/execution/datasources/ddl.scala | 52 ++++++------- .../org/apache/spark/sql/jdbc/JDBCRelation.scala | 5 +- .../org/apache/spark/sql/json/JSONRelation.scala | 5 +- .../apache/spark/sql/parquet/ParquetRelation.scala | 5 +- .../org/apache/spark/sql/sources/interfaces.scala | 21 ++++++ ...org.apache.spark.sql.sources.DataSourceRegister | 3 + .../spark/sql/sources/DDLSourceLoadSuite.scala | 85 ++++++++++++++++++++++ ...org.apache.spark.sql.sources.DataSourceRegister | 1 + .../apache/spark/sql/hive/orc/OrcRelation.scala | 5 +- 11 files changed, 156 insertions(+), 30 deletions(-) create mode 100644 sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister create mode 100644 sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala create mode 100644 sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister diff --git a/.rat-excludes b/.rat-excludes index 236c2db053..7277146584 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -93,3 +93,4 @@ INDEX .lintr gen-java.* .*avpr +org.apache.spark.sql.sources.DataSourceRegister diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000000..cc32d4b727 --- /dev/null +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,3 @@ +org.apache.spark.sql.jdbc.DefaultSource +org.apache.spark.sql.json.DefaultSource +org.apache.spark.sql.parquet.DefaultSource diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 0cdb407ad5..8c2f297e42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -17,7 +17,12 @@ package org.apache.spark.sql.execution.datasources +import java.util.ServiceLoader + +import scala.collection.Iterator +import scala.collection.JavaConversions._ import scala.language.{existentials, implicitConversions} +import scala.util.{Failure, Success, Try} import scala.util.matching.Regex import org.apache.hadoop.fs.Path @@ -190,37 +195,32 @@ private[sql] class DDLParser( } } -private[sql] object ResolvedDataSource { - - private val builtinSources = Map( - "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource", - "json" -> "org.apache.spark.sql.json.DefaultSource", - "parquet" -> "org.apache.spark.sql.parquet.DefaultSource", - "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource" - ) +private[sql] object ResolvedDataSource extends Logging { /** Given a provider name, look up the data source class definition. */ def lookupDataSource(provider: String): Class[_] = { + val provider2 = s"$provider.DefaultSource" val loader = Utils.getContextOrSparkClassLoader - - if (builtinSources.contains(provider)) { - return loader.loadClass(builtinSources(provider)) - } - - try { - loader.loadClass(provider) - } catch { - case cnf: java.lang.ClassNotFoundException => - try { - loader.loadClass(provider + ".DefaultSource") - } catch { - case cnf: java.lang.ClassNotFoundException => - if (provider.startsWith("org.apache.spark.sql.hive.orc")) { - sys.error("The ORC data source must be used with Hive support enabled.") - } else { - sys.error(s"Failed to load class for data source: $provider") - } + val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) + + serviceLoader.iterator().filter(_.format().equalsIgnoreCase(provider)).toList match { + /** the provider format did not match any given registered aliases */ + case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { + case Success(dataSource) => dataSource + case Failure(error) => if (provider.startsWith("org.apache.spark.sql.hive.orc")) { + throw new ClassNotFoundException( + "The ORC data source must be used with Hive support enabled.", error) + } else { + throw new ClassNotFoundException( + s"Failed to load class for data source: $provider", error) } + } + /** there is exactly one registered alias */ + case head :: Nil => head.getClass + /** There are multiple registered aliases for the input */ + case sources => sys.error(s"Multiple sources found for $provider, " + + s"(${sources.map(_.getClass.getName).mkString(", ")}), " + + "please specify the fully qualified class name") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index 41d0ecb4bb..48d97ced9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -77,7 +77,10 @@ private[sql] object JDBCRelation { } } -private[sql] class DefaultSource extends RelationProvider { +private[sql] class DefaultSource extends RelationProvider with DataSourceRegister { + + def format(): String = "jdbc" + /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 10f1367e69..b34a272ec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -37,7 +37,10 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, Row, SQLContext} -private[sql] class DefaultSource extends HadoopFsRelationProvider { +private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { + + def format(): String = "json" + override def createRelation( sqlContext: SQLContext, paths: Array[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 48009b2fd0..b6db71b5b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -49,7 +49,10 @@ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} -private[sql] class DefaultSource extends HadoopFsRelationProvider { +private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { + + def format(): String = "parquet" + override def createRelation( sqlContext: SQLContext, paths: Array[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index c5b7ee73eb..4aafec0e2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -37,6 +37,27 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql._ import org.apache.spark.util.SerializableConfiguration +/** + * ::DeveloperApi:: + * Data sources should implement this trait so that they can register an alias to their data source. + * This allows users to give the data source alias as the format type over the fully qualified + * class name. + * + * ex: parquet.DefaultSource.format = "parquet". + * + * A new instance of this class with be instantiated each time a DDL call is made. + */ +@DeveloperApi +trait DataSourceRegister { + + /** + * The string that represents the format that this data source provider uses. This is + * overridden by children to provide a nice alias for the data source, + * ex: override def format(): String = "parquet" + */ + def format(): String +} + /** * ::DeveloperApi:: * Implemented by objects that produce relations for a specific kind of data source. When diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000000..cfd7889b4a --- /dev/null +++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,3 @@ +org.apache.spark.sql.sources.FakeSourceOne +org.apache.spark.sql.sources.FakeSourceTwo +org.apache.spark.sql.sources.FakeSourceThree diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala new file mode 100644 index 0000000000..1a4d41b02c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -0,0 +1,85 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +class FakeSourceOne extends RelationProvider with DataSourceRegister { + + def format(): String = "Fluet da Bomb" + + override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = + new BaseRelation { + override def sqlContext: SQLContext = cont + + override def schema: StructType = + StructType(Seq(StructField("stringType", StringType, nullable = false))) + } +} + +class FakeSourceTwo extends RelationProvider with DataSourceRegister { + + def format(): String = "Fluet da Bomb" + + override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = + new BaseRelation { + override def sqlContext: SQLContext = cont + + override def schema: StructType = + StructType(Seq(StructField("stringType", StringType, nullable = false))) + } +} + +class FakeSourceThree extends RelationProvider with DataSourceRegister { + + def format(): String = "gathering quorum" + + override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = + new BaseRelation { + override def sqlContext: SQLContext = cont + + override def schema: StructType = + StructType(Seq(StructField("stringType", StringType, nullable = false))) + } +} +// please note that the META-INF/services had to be modified for the test directory for this to work +class DDLSourceLoadSuite extends DataSourceTest { + + test("data sources with the same name") { + intercept[RuntimeException] { + caseInsensitiveContext.read.format("Fluet da Bomb").load() + } + } + + test("load data source from format alias") { + caseInsensitiveContext.read.format("gathering quorum").load().schema == + StructType(Seq(StructField("stringType", StringType, nullable = false))) + } + + test("specify full classname with duplicate formats") { + caseInsensitiveContext.read.format("org.apache.spark.sql.sources.FakeSourceOne") + .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false))) + } + + test("Loading Orc") { + intercept[ClassNotFoundException] { + caseInsensitiveContext.read.format("orc").load() + } + } +} diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000000..4a774fbf1f --- /dev/null +++ b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +org.apache.spark.sql.hive.orc.DefaultSource diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 7c8704b47f..0c344c63fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -47,7 +47,10 @@ import org.apache.spark.util.SerializableConfiguration /* Implicit conversions */ import scala.collection.JavaConversions._ -private[sql] class DefaultSource extends HadoopFsRelationProvider { +private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { + + def format(): String = "orc" + def createRelation( sqlContext: SQLContext, paths: Array[String], -- cgit v1.2.3