aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorThu Kyaw <trk007@gmail.com>2014-12-18 20:08:32 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-18 20:08:32 -0800
commitb68bc6d2647f8a5caf8aa558e4115f9cc254f67c (patch)
tree2d53dab2997d2b2f3775e7627df2b6e5bff3b750 /sql
parentf728e0fe7e860fe6dd3437e248472a67a2d435f8 (diff)
downloadspark-b68bc6d2647f8a5caf8aa558e4115f9cc254f67c.tar.gz
spark-b68bc6d2647f8a5caf8aa558e4115f9cc254f67c.tar.bz2
spark-b68bc6d2647f8a5caf8aa558e4115f9cc254f67c.zip
[SPARK-3928][SQL] Support wildcard matches on Parquet files.
...arquetFile accept hadoop glob pattern in path. Author: Thu Kyaw <trk007@gmail.com> Closes #3407 from tkyaw/master and squashes the following commits: 19115ad [Thu Kyaw] Merge https://github.com/apache/spark ceded32 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files. d322c28 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files. ce677c6 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala26
5 files changed, 76 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 4c0869e05b..8884204e50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -133,7 +133,9 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
}
/**
- * Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
+ * Loads a parquet file from regular path or files that match file patterns in path,
+ * returning the result as a [[JavaSchemaRDD]].
+ * Supported glob file pattern information at ([[http://tinyurl.com/kcqrzn8]]).
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 5a49384ade..96bace1769 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -621,7 +621,9 @@ private[parquet] object FileSystemHelper {
throw new IllegalArgumentException(
s"ParquetTableOperations: path $path does not exist or is not a directory")
}
- fs.listStatus(path).map(_.getPath)
+ fs.globStatus(path)
+ .flatMap { status => if(status.isDir) fs.listStatus(status.getPath) else List(status) }
+ .map(_.getPath)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
index c0918a40d1..d5993656e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
@@ -422,5 +422,41 @@ private[sql] object ParquetTestData {
val first = reader.read()
assert(first != null)
} */
+
+ // to test golb pattern (wild card pattern matching for parquetFile input
+ val testGlobDir = Utils.createTempDir()
+ val testGlobSubDir1 = Utils.createTempDir(testGlobDir.getPath)
+ val testGlobSubDir2 = Utils.createTempDir(testGlobDir.getPath)
+ val testGlobSubDir3 = Utils.createTempDir(testGlobDir.getPath)
+
+ def writeGlobFiles() = {
+ val subDirs = Array(testGlobSubDir1, testGlobSubDir2, testGlobSubDir3)
+
+ subDirs.foreach { dir =>
+ val path: Path = new Path(new Path(dir.toURI), new Path("part-r-0.parquet"))
+ val job = new Job()
+ val schema: MessageType = MessageTypeParser.parseMessageType(testSchema)
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
+
+ for(i <- 0 until 15) {
+ val record = new SimpleGroup(schema)
+ if(i % 3 == 0) {
+ record.add(0, true)
+ } else {
+ record.add(0, false)
+ }
+ if(i % 5 == 0) {
+ record.add(1, 5)
+ }
+ record.add(2, "abc")
+ record.add(3, i.toLong << 33)
+ record.add(4, 2.5F)
+ record.add(5, 4.5D)
+ writer.write(record)
+ }
+ writer.close()
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index fa37d1f2ae..0e6fb57d57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -437,10 +437,14 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
val path = origPath.makeQualified(fs)
- val children = fs.listStatus(path).filterNot { status =>
- val name = status.getPath.getName
- (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
- }
+ val children =
+ fs
+ .globStatus(path)
+ .flatMap { status => if(status.isDir) fs.listStatus(status.getPath) else List(status) }
+ .filterNot { status =>
+ val name = status.getPath.getName
+ (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
+ }
ParquetRelation.enableLogForwarding()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 0e5635d3e9..074855389d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -95,6 +95,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
ParquetTestData.writeNestedFile2()
ParquetTestData.writeNestedFile3()
ParquetTestData.writeNestedFile4()
+ ParquetTestData.writeGlobFiles()
testRDD = parquetFile(ParquetTestData.testDir.toString)
testRDD.registerTempTable("testsource")
parquetFile(ParquetTestData.testFilterDir.toString)
@@ -110,6 +111,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
Utils.deleteRecursively(ParquetTestData.testNestedDir2)
Utils.deleteRecursively(ParquetTestData.testNestedDir3)
Utils.deleteRecursively(ParquetTestData.testNestedDir4)
+ Utils.deleteRecursively(ParquetTestData.testGlobDir)
// here we should also unregister the table??
setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, originalParquetFilterPushdownEnabled.toString)
@@ -1049,4 +1051,28 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
"Comparison predicate with null shouldn't be pushed down")
}
}
+
+ test("Import of simple Parquet files using glob wildcard pattern") {
+ val testGlobDir = ParquetTestData.testGlobDir.toString
+ val globPatterns = Array(testGlobDir + "/*/*", testGlobDir + "/spark-*/*", testGlobDir + "/?pa?k-*/*")
+ globPatterns.foreach { path =>
+ val result = parquetFile(path).collect()
+ assert(result.size === 45)
+ result.zipWithIndex.foreach {
+ case (row, index) => {
+ val checkBoolean =
+ if ((index % 15) % 3 == 0)
+ row(0) == true
+ else
+ row(0) == false
+ assert(checkBoolean === true, s"boolean field value in line $index did not match")
+ if ((index % 15) % 5 == 0) assert(row(1) === 5, s"int field value in line $index did not match")
+ assert(row(2) === "abc", s"string field value in line $index did not match")
+ assert(row(3) === ((index.toLong % 15) << 33), s"long value in line $index did not match")
+ assert(row(4) === 2.5F, s"float field value in line $index did not match")
+ assert(row(5) === 4.5D, s"double field value in line $index did not match")
+ }
+ }
+ }
+ }
}