aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-27 01:01:32 +0800
committerCheng Lian <lian@databricks.com>2015-02-27 01:01:32 +0800
commit192e42a2933eb283e12bfdfb46e2ef895228af4a (patch)
tree514f7bad2ebbb1b761996cc64b6a537ea19f1b75 /sql
parentf02394d06473889d0d7897c4583239e6e136ff46 (diff)
downloadspark-192e42a2933eb283e12bfdfb46e2ef895228af4a.tar.gz
spark-192e42a2933eb283e12bfdfb46e2ef895228af4a.tar.bz2
spark-192e42a2933eb283e12bfdfb46e2ef895228af4a.zip
[SPARK-6016][SQL] Cannot read the parquet table after overwriting the existing table when spark.sql.parquet.cacheMetadata=true
Please see JIRA (https://issues.apache.org/jira/browse/SPARK-6016) for details of the bug. Author: Yin Huai <yhuai@databricks.com> Closes #4775 from yhuai/parquetFooterCache and squashes the following commits: 78787b1 [Yin Huai] Remove footerCache in FilteringParquetRowInputFormat. dff6fba [Yin Huai] Failed unit test.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala49
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala27
3 files changed, 42 insertions, 42 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 4dc13b036c..9061d3f5fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -374,8 +374,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
private[parquet] class FilteringParquetRowInputFormat
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
- private var footers: JList[Footer] = _
-
private var fileStatuses = Map.empty[Path, FileStatus]
override def createRecordReader(
@@ -396,46 +394,15 @@ private[parquet] class FilteringParquetRowInputFormat
}
}
- override def getFooters(jobContext: JobContext): JList[Footer] = {
- import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache
-
- if (footers eq null) {
- val conf = ContextUtil.getConfiguration(jobContext)
- val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
- val statuses = listStatus(jobContext)
- fileStatuses = statuses.map(file => file.getPath -> file).toMap
- if (statuses.isEmpty) {
- footers = Collections.emptyList[Footer]
- } else if (!cacheMetadata) {
- // Read the footers from HDFS
- footers = getFooters(conf, statuses)
- } else {
- // Read only the footers that are not in the footerCache
- val foundFooters = footerCache.getAllPresent(statuses)
- val toFetch = new ArrayList[FileStatus]
- for (s <- statuses) {
- if (!foundFooters.containsKey(s)) {
- toFetch.add(s)
- }
- }
- val newFooters = new mutable.HashMap[FileStatus, Footer]
- if (toFetch.size > 0) {
- val startFetch = System.currentTimeMillis
- val fetched = getFooters(conf, toFetch)
- logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms")
- for ((status, i) <- toFetch.zipWithIndex) {
- newFooters(status) = fetched.get(i)
- }
- footerCache.putAll(newFooters)
- }
- footers = new ArrayList[Footer](statuses.size)
- for (status <- statuses) {
- footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
- }
- }
- }
+ // This is only a temporary solution sicne we need to use fileStatuses in
+ // both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
+ // two methods.
+ override def getSplits(jobContext: JobContext): JList[InputSplit] = {
+ // First set fileStatuses.
+ val statuses = listStatus(jobContext)
+ fileStatuses = statuses.map(file => file.getPath -> file).toMap
- footers
+ super.getSplits(jobContext)
}
// TODO Remove this method and related code once PARQUET-16 is fixed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 16b771344b..e648618468 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -200,7 +200,7 @@ private[sql] case class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _
// Parquet footer cache.
- private var footers: Map[FileStatus, Footer] = _
+ var footers: Map[FileStatus, Footer] = _
// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
@@ -400,6 +400,7 @@ private[sql] case class ParquetRelation2(
} else {
metadataCache.dataStatuses.toSeq
}
+ val selectedFooters = selectedFiles.map(metadataCache.footers)
// FileInputFormat cannot handle empty lists.
if (selectedFiles.nonEmpty) {
@@ -447,11 +448,16 @@ private[sql] case class ParquetRelation2(
@transient
val cachedStatus = selectedFiles
+ @transient
+ val cachedFooters = selectedFooters
+
// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
+
+ override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 80fd5cda20..6a9d9daf67 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
+import org.apache.spark.sql.SaveMode
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
@@ -409,6 +410,32 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
)
""")
}
+
+ test("SPARK-6016 make sure to use the latest footers") {
+ sql("drop table if exists spark_6016_fix")
+
+ // Create a DataFrame with two partitions. So, the created table will have two parquet files.
+ val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
+ df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+ checkAnswer(
+ sql("select * from spark_6016_fix"),
+ (1 to 10).map(i => Row(i))
+ )
+
+ // Create a DataFrame with four partitions. So, the created table will have four parquet files.
+ val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
+ df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+ // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
+ // since the new table has four parquet files, we are trying to read new footers from two files
+ // and then merge metadata in footers of these four (two outdated ones and two latest one),
+ // which will cause an error.
+ checkAnswer(
+ sql("select * from spark_6016_fix"),
+ (1 to 10).map(i => Row(i))
+ )
+
+ sql("drop table spark_6016_fix")
+ }
}
class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {