diff options
author | chutium <teng.qiu@gmail.com> | 2014-08-27 13:13:04 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-27 13:13:04 -0700 |
commit | 48f42781dedecd38ddcb2dcf67dead92bb4318f5 (patch) | |
tree | 1333a8f8c4743d5bc2af15431ebdd969995773f6 /sql | |
parent | 191d7cf2a655d032f160b9fa181730364681d0e7 (diff) | |
download | spark-48f42781dedecd38ddcb2dcf67dead92bb4318f5.tar.gz spark-48f42781dedecd38ddcb2dcf67dead92bb4318f5.tar.bz2 spark-48f42781dedecd38ddcb2dcf67dead92bb4318f5.zip |
[SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter
```if (!fs.getFileStatus(path).isDir) throw Exception``` make no sense after this commit #1370
be careful if someone is working on SPARK-2551, make sure the new change passes test case ```test("Read a parquet file instead of a directory")```
Author: chutium <teng.qiu@gmail.com>
Closes #2044 from chutium/parquet-singlefile and squashes the following commits:
4ae477f [chutium] [SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala | 7 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala | 27 |
2 files changed, 26 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 1a52377651..2941b97935 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -394,17 +394,14 @@ private[parquet] object ParquetTypesConverter extends Logging { throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") } val path = origPath.makeQualified(fs) - if (!fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException( - s"Expected $path for be a directory with Parquet files/metadata") - } - ParquetRelation.enableLogForwarding() val children = fs.listStatus(path).filterNot { status => val name = status.getPath.getName (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE } + ParquetRelation.enableLogForwarding() + // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row // groups. Since Parquet schema is replicated among all row groups, we only need to touch a // single row group to read schema related metadata. Notice that we are making assumptions that diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 4219cc0800..42923b6a28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.util.Utils - case class TestRDDEntry(key: Int, value: String) case class NullReflectData( @@ -420,8 +419,30 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA val rdd_copy = sql("SELECT * FROM tmpx").collect() val rdd_orig = rdd.collect() for(i <- 0 to 99) { - assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i") - assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i") + assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i") + assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i") + } + Utils.deleteRecursively(file) + } + + test("Read a parquet file instead of a directory") { + val file = getTempFilePath("parquet") + val path = file.toString + val fsPath = new Path(path) + val fs: FileSystem = fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration) + val rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + rdd.coalesce(1).saveAsParquetFile(path) + + val children = fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet")) + assert(children.length > 0) + val readFile = parquetFile(path + "/" + children(0).getPath.getName) + readFile.registerTempTable("tmpx") + val rdd_copy = sql("SELECT * FROM tmpx").collect() + val rdd_orig = rdd.collect() + for(i <- 0 to 99) { + assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i") + assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i") } Utils.deleteRecursively(file) } |