diff options
author | Koert Kuipers <koert@tresata.com> | 2015-10-17 14:56:24 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-10-17 14:56:24 -0700 |
commit | 57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789 (patch) | |
tree | bb148c00e30b44acca2bcc913cadb81dab76b3a3 /sql/core | |
parent | 254937420678a299f06b6f4e2696c623da56cf3a (diff) | |
download | spark-57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789.tar.gz spark-57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789.tar.bz2 spark-57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789.zip |
[SPARK-10185] [SQL] Feat sql comma separated paths
Make sure comma-separated paths get processed correcly in ResolvedDataSource for a HadoopFsRelationProvider
Author: Koert Kuipers <koert@tresata.com>
Closes #8416 from koertkuipers/feat-sql-comma-separated-paths.
Diffstat (limited to 'sql/core')
3 files changed, 66 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index eacdea2c1e..e8651a3569 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -22,6 +22,7 @@ import java.util.Properties import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.StringUtils import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD @@ -124,6 +125,16 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { } /** + * Loads input in as a [[DataFrame]], for data sources that support multiple paths. + * Only works if the source is a HadoopFsRelationProvider. + * + * @since 1.6.0 + */ + def load(paths: Array[String]): DataFrame = { + option("paths", paths.map(StringUtils.escapeString(_, '\\', ',')).mkString(",")).load() + } + + /** * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table and connection properties. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 0117244366..54beabbf63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Success, Failure, Try} import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.StringUtils import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil @@ -89,7 +90,11 @@ object ResolvedDataSource extends Logging { val relation = userSpecifiedSchema match { case Some(schema: StructType) => clazz.newInstance() match { case dataSource: SchemaRelationProvider => - dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + if (caseInsensitiveOptions.contains("paths")) { + throw new AnalysisException(s"$className does not support paths option.") + } + dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema) case dataSource: HadoopFsRelationProvider => val maybePartitionsSchema = if (partitionColumns.isEmpty) { None @@ -99,10 +104,19 @@ object ResolvedDataSource extends Logging { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { - val patternPath = new Path(caseInsensitiveOptions("path")) - val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray + if (caseInsensitiveOptions.contains("paths") && + caseInsensitiveOptions.contains("path")) { + throw new AnalysisException(s"Both path and paths options are present.") + } + caseInsensitiveOptions.get("paths") + .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ','))) + .getOrElse(Array(caseInsensitiveOptions("path"))) + .flatMap{ pathString => + val hdfsPath = new Path(pathString) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString) + } } val dataSchema = @@ -122,14 +136,27 @@ object ResolvedDataSource extends Logging { case None => clazz.newInstance() match { case dataSource: RelationProvider => - dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + if (caseInsensitiveOptions.contains("paths")) { + throw new AnalysisException(s"$className does not support paths option.") + } + dataSource.createRelation(sqlContext, caseInsensitiveOptions) case dataSource: HadoopFsRelationProvider => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { - val patternPath = new Path(caseInsensitiveOptions("path")) - val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray + if (caseInsensitiveOptions.contains("paths") && + caseInsensitiveOptions.contains("path")) { + throw new AnalysisException(s"Both path and paths options are present.") + } + caseInsensitiveOptions.get("paths") + .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ','))) + .getOrElse(Array(caseInsensitiveOptions("path"))) + .flatMap{ pathString => + val hdfsPath = new Path(pathString) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString) + } } dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions) case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index d919877746..832ea02cb6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -890,6 +890,24 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .collect() } + test("SPARK-10185: Read multiple Hadoop Filesystem paths and paths with a comma in it") { + withTempDir { dir => + val df1 = Seq((1, 22)).toDF("a", "b") + val dir1 = new File(dir, "dir,1").getCanonicalPath + df1.write.format("json").save(dir1) + + val df2 = Seq((2, 23)).toDF("a", "b") + val dir2 = new File(dir, "dir2").getCanonicalPath + df2.write.format("json").save(dir2) + + checkAnswer(sqlContext.read.format("json").load(Array(dir1, dir2)), + Row(1, 22) :: Row(2, 23) :: Nil) + + checkAnswer(sqlContext.read.format("json").load(dir1), + Row(1, 22) :: Nil) + } + } + test("SPARK-10034: Sort on Aggregate with aggregation expression named 'aggOrdering'") { val df = Seq(1 -> 2).toDF("i", "j") val query = df.groupBy('i) |