From b68bc6d2647f8a5caf8aa558e4115f9cc254f67c Mon Sep 17 00:00:00 2001 From: Thu Kyaw Date: Thu, 18 Dec 2014 20:08:32 -0800 Subject: [SPARK-3928][SQL] Support wildcard matches on Parquet files. ...arquetFile accept hadoop glob pattern in path. Author: Thu Kyaw Closes #3407 from tkyaw/master and squashes the following commits: 19115ad [Thu Kyaw] Merge https://github.com/apache/spark ceded32 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files. d322c28 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files. ce677c6 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files. --- .../apache/spark/sql/api/java/JavaSQLContext.scala | 4 ++- .../spark/sql/parquet/ParquetTableOperations.scala | 4 ++- .../apache/spark/sql/parquet/ParquetTestData.scala | 36 ++++++++++++++++++++++ .../apache/spark/sql/parquet/ParquetTypes.scala | 12 +++++--- .../spark/sql/parquet/ParquetQuerySuite.scala | 26 ++++++++++++++++ 5 files changed, 76 insertions(+), 6 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 4c0869e05b..8884204e50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -133,7 +133,9 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { } /** - * Loads a parquet file, returning the result as a [[JavaSchemaRDD]]. + * Loads a parquet file from regular path or files that match file patterns in path, + * returning the result as a [[JavaSchemaRDD]]. + * Supported glob file pattern information at ([[http://tinyurl.com/kcqrzn8]]). */ def parquetFile(path: String): JavaSchemaRDD = new JavaSchemaRDD( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 5a49384ade..96bace1769 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -621,7 +621,9 @@ private[parquet] object FileSystemHelper { throw new IllegalArgumentException( s"ParquetTableOperations: path $path does not exist or is not a directory") } - fs.listStatus(path).map(_.getPath) + fs.globStatus(path) + .flatMap { status => if(status.isDir) fs.listStatus(status.getPath) else List(status) } + .map(_.getPath) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index c0918a40d1..d5993656e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -422,5 +422,41 @@ private[sql] object ParquetTestData { val first = reader.read() assert(first != null) } */ + + // to test golb pattern (wild card pattern matching for parquetFile input + val testGlobDir = Utils.createTempDir() + val testGlobSubDir1 = Utils.createTempDir(testGlobDir.getPath) + val testGlobSubDir2 = Utils.createTempDir(testGlobDir.getPath) + val testGlobSubDir3 = Utils.createTempDir(testGlobDir.getPath) + + def writeGlobFiles() = { + val subDirs = Array(testGlobSubDir1, testGlobSubDir2, testGlobSubDir3) + + subDirs.foreach { dir => + val path: Path = new Path(new Path(dir.toURI), new Path("part-r-0.parquet")) + val job = new Job() + val schema: MessageType = MessageTypeParser.parseMessageType(testSchema) + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + + for(i <- 0 until 15) { + val record = new SimpleGroup(schema) + if(i % 3 == 0) { + record.add(0, true) + } else { + record.add(0, false) + } + if(i % 5 == 0) { + record.add(1, 5) + } + record.add(2, "abc") + record.add(3, i.toLong << 33) + record.add(4, 2.5F) + record.add(5, 4.5D) + writer.write(record) + } + writer.close() + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index fa37d1f2ae..0e6fb57d57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -437,10 +437,14 @@ private[parquet] object ParquetTypesConverter extends Logging { } val path = origPath.makeQualified(fs) - val children = fs.listStatus(path).filterNot { status => - val name = status.getPath.getName - (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE - } + val children = + fs + .globStatus(path) + .flatMap { status => if(status.isDir) fs.listStatus(status.getPath) else List(status) } + .filterNot { status => + val name = status.getPath.getName + (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE + } ParquetRelation.enableLogForwarding() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 0e5635d3e9..074855389d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -95,6 +95,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA ParquetTestData.writeNestedFile2() ParquetTestData.writeNestedFile3() ParquetTestData.writeNestedFile4() + ParquetTestData.writeGlobFiles() testRDD = parquetFile(ParquetTestData.testDir.toString) testRDD.registerTempTable("testsource") parquetFile(ParquetTestData.testFilterDir.toString) @@ -110,6 +111,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA Utils.deleteRecursively(ParquetTestData.testNestedDir2) Utils.deleteRecursively(ParquetTestData.testNestedDir3) Utils.deleteRecursively(ParquetTestData.testNestedDir4) + Utils.deleteRecursively(ParquetTestData.testGlobDir) // here we should also unregister the table?? setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, originalParquetFilterPushdownEnabled.toString) @@ -1049,4 +1051,28 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA "Comparison predicate with null shouldn't be pushed down") } } + + test("Import of simple Parquet files using glob wildcard pattern") { + val testGlobDir = ParquetTestData.testGlobDir.toString + val globPatterns = Array(testGlobDir + "/*/*", testGlobDir + "/spark-*/*", testGlobDir + "/?pa?k-*/*") + globPatterns.foreach { path => + val result = parquetFile(path).collect() + assert(result.size === 45) + result.zipWithIndex.foreach { + case (row, index) => { + val checkBoolean = + if ((index % 15) % 3 == 0) + row(0) == true + else + row(0) == false + assert(checkBoolean === true, s"boolean field value in line $index did not match") + if ((index % 15) % 5 == 0) assert(row(1) === 5, s"int field value in line $index did not match") + assert(row(2) === "abc", s"string field value in line $index did not match") + assert(row(3) === ((index.toLong % 15) << 33), s"long value in line $index did not match") + assert(row(4) === 2.5F, s"float field value in line $index did not match") + assert(row(5) === 4.5D, s"double field value in line $index did not match") + } + } + } + } } -- cgit v1.2.3