aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVolodymyr Lyubinets <vlyubin@gmail.com>2015-03-16 12:13:18 -0700
committerAaron Davidson <aaron@databricks.com>2015-03-16 12:13:18 -0700
commitd19efeddc0cb710c9496af11e447d39e1ad61b31 (patch)
tree76b3c84f8072cd9bb06be78d24085d2212f662a8
parent12a345adcbaee359199ddfed4f41bf0e19d66d48 (diff)
downloadspark-d19efeddc0cb710c9496af11e447d39e1ad61b31.tar.gz
spark-d19efeddc0cb710c9496af11e447d39e1ad61b31.tar.bz2
spark-d19efeddc0cb710c9496af11e447d39e1ad61b31.zip
[SPARK-6330] Fix filesystem bug in newParquet relation
If I'm running this locally and my path points to S3, this would currently error out because of incorrect FS. I tested this in a scenario that previously didn't work, this change seemed to fix the issue. Author: Volodymyr Lyubinets <vlyubin@gmail.com> Closes #5020 from vlyubin/parquertbug and squashes the following commits: a645ad5 [Volodymyr Lyubinets] Fix filesystem bug in newParquet relation
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 234e6bb844..c38b6e8c61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
import java.io.IOException
import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
+import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, List => JList}
@@ -244,11 +245,10 @@ private[sql] case class ParquetRelation2(
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
- val fs = FileSystem.get(sparkContext.hadoopConfiguration)
-
// Support either reading a collection of raw Parquet part-files, or a collection of folders
// containing Parquet files (e.g. partitioned Parquet table).
val baseStatuses = paths.distinct.map { p =>
+ val fs = FileSystem.get(URI.create(p), sparkContext.hadoopConfiguration)
val qualified = fs.makeQualified(new Path(p))
if (!fs.exists(qualified) && maybeSchema.isDefined) {
@@ -262,6 +262,7 @@ private[sql] case class ParquetRelation2(
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = baseStatuses.flatMap { f =>
+ val fs = FileSystem.get(f.getPath.toUri, sparkContext.hadoopConfiguration)
SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))