aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorKoert Kuipers <koert@tresata.com>2015-10-17 14:56:24 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-17 14:56:24 -0700
commit57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789 (patch)
treebb148c00e30b44acca2bcc913cadb81dab76b3a3 /sql
parent254937420678a299f06b6f4e2696c623da56cf3a (diff)
downloadspark-57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789.tar.gz
spark-57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789.tar.bz2
spark-57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789.zip
[SPARK-10185] [SQL] Feat sql comma separated paths
Make sure comma-separated paths get processed correcly in ResolvedDataSource for a HadoopFsRelationProvider Author: Koert Kuipers <koert@tresata.com> Closes #8416 from koertkuipers/feat-sql-comma-separated-paths.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala18
3 files changed, 66 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index eacdea2c1e..e8651a3569 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.StringUtils
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
@@ -124,6 +125,16 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}
/**
+ * Loads input in as a [[DataFrame]], for data sources that support multiple paths.
+ * Only works if the source is a HadoopFsRelationProvider.
+ *
+ * @since 1.6.0
+ */
+ def load(paths: Array[String]): DataFrame = {
+ option("paths", paths.map(StringUtils.escapeString(_, '\\', ',')).mkString(",")).load()
+ }
+
+ /**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table and connection properties.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 0117244366..54beabbf63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions}
import scala.util.{Success, Failure, Try}
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.StringUtils
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
@@ -89,7 +90,11 @@ object ResolvedDataSource extends Logging {
val relation = userSpecifiedSchema match {
case Some(schema: StructType) => clazz.newInstance() match {
case dataSource: SchemaRelationProvider =>
- dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ if (caseInsensitiveOptions.contains("paths")) {
+ throw new AnalysisException(s"$className does not support paths option.")
+ }
+ dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
case dataSource: HadoopFsRelationProvider =>
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
None
@@ -99,10 +104,19 @@ object ResolvedDataSource extends Logging {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
- val patternPath = new Path(caseInsensitiveOptions("path"))
- val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+ if (caseInsensitiveOptions.contains("paths") &&
+ caseInsensitiveOptions.contains("path")) {
+ throw new AnalysisException(s"Both path and paths options are present.")
+ }
+ caseInsensitiveOptions.get("paths")
+ .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ',')))
+ .getOrElse(Array(caseInsensitiveOptions("path")))
+ .flatMap{ pathString =>
+ val hdfsPath = new Path(pathString)
+ val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString)
+ }
}
val dataSchema =
@@ -122,14 +136,27 @@ object ResolvedDataSource extends Logging {
case None => clazz.newInstance() match {
case dataSource: RelationProvider =>
- dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ if (caseInsensitiveOptions.contains("paths")) {
+ throw new AnalysisException(s"$className does not support paths option.")
+ }
+ dataSource.createRelation(sqlContext, caseInsensitiveOptions)
case dataSource: HadoopFsRelationProvider =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
- val patternPath = new Path(caseInsensitiveOptions("path"))
- val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+ if (caseInsensitiveOptions.contains("paths") &&
+ caseInsensitiveOptions.contains("path")) {
+ throw new AnalysisException(s"Both path and paths options are present.")
+ }
+ caseInsensitiveOptions.get("paths")
+ .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ',')))
+ .getOrElse(Array(caseInsensitiveOptions("path")))
+ .flatMap{ pathString =>
+ val hdfsPath = new Path(pathString)
+ val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString)
+ }
}
dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d919877746..832ea02cb6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -890,6 +890,24 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.collect()
}
+ test("SPARK-10185: Read multiple Hadoop Filesystem paths and paths with a comma in it") {
+ withTempDir { dir =>
+ val df1 = Seq((1, 22)).toDF("a", "b")
+ val dir1 = new File(dir, "dir,1").getCanonicalPath
+ df1.write.format("json").save(dir1)
+
+ val df2 = Seq((2, 23)).toDF("a", "b")
+ val dir2 = new File(dir, "dir2").getCanonicalPath
+ df2.write.format("json").save(dir2)
+
+ checkAnswer(sqlContext.read.format("json").load(Array(dir1, dir2)),
+ Row(1, 22) :: Row(2, 23) :: Nil)
+
+ checkAnswer(sqlContext.read.format("json").load(dir1),
+ Row(1, 22) :: Nil)
+ }
+ }
+
test("SPARK-10034: Sort on Aggregate with aggregation expression named 'aggOrdering'") {
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)