aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorBogdan Raducanu <bogdan@databricks.com>2017-04-10 21:56:21 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2017-04-10 21:56:21 +0200
commitf6dd8e0e1673aa491b895c1f0467655fa4e9d52f (patch)
tree6f051012078833326c57b74eeac64281c522f2be /sql
parenta26e3ed5e414d0a350cfe65dd511b154868b9f1d (diff)
downloadspark-f6dd8e0e1673aa491b895c1f0467655fa4e9d52f.tar.gz
spark-f6dd8e0e1673aa491b895c1f0467655fa4e9d52f.tar.bz2
spark-f6dd8e0e1673aa491b895c1f0467655fa4e9d52f.zip
[SPARK-20280][CORE] FileStatusCache Weigher integer overflow
## What changes were proposed in this pull request? Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor. ## How was this patch tested? New test in FileIndexSuite Author: Bogdan Raducanu <bogdan@databricks.com> Closes #17591 from bogdanrdc/SPARK-20280.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala16
2 files changed, 50 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index 5d97558633..aea27bd4c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -94,27 +94,48 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
// Opaque object that uniquely identifies a shared cache user
private type ClientId = Object
+
private val warnedAboutEviction = new AtomicBoolean(false)
// we use a composite cache key in order to distinguish entries inserted by different clients
- private val cache: Cache[(ClientId, Path), Array[FileStatus]] = CacheBuilder.newBuilder()
- .weigher(new Weigher[(ClientId, Path), Array[FileStatus]] {
+ private val cache: Cache[(ClientId, Path), Array[FileStatus]] = {
+ // [[Weigher]].weigh returns Int so we could only cache objects < 2GB
+ // instead, the weight is divided by this factor (which is smaller
+ // than the size of one [[FileStatus]]).
+ // so it will support objects up to 64GB in size.
+ val weightScale = 32
+ val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] {
override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = {
- (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
- }})
- .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
- override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
- : Unit = {
+ val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale
+ if (estimate > Int.MaxValue) {
+ logWarning(s"Cached table partition metadata size is too big. Approximating to " +
+ s"${Int.MaxValue.toLong * weightScale}.")
+ Int.MaxValue
+ } else {
+ estimate.toInt
+ }
+ }
+ }
+ val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
+ override def onRemoval(
+ removed: RemovalNotification[(ClientId, Path),
+ Array[FileStatus]]): Unit = {
if (removed.getCause == RemovalCause.SIZE &&
- warnedAboutEviction.compareAndSet(false, true)) {
+ warnedAboutEviction.compareAndSet(false, true)) {
logWarning(
"Evicting cached table partition metadata from memory due to size constraints " +
- "(spark.sql.hive.filesourcePartitionFileCacheSize = " + maxSizeInBytes + " bytes). " +
- "This may impact query planning performance.")
+ "(spark.sql.hive.filesourcePartitionFileCacheSize = "
+ + maxSizeInBytes + " bytes). This may impact query planning performance.")
}
- }})
- .maximumWeight(maxSizeInBytes)
- .build[(ClientId, Path), Array[FileStatus]]()
+ }
+ }
+ CacheBuilder.newBuilder()
+ .weigher(weigher)
+ .removalListener(removalListener)
+ .maximumWeight(maxSizeInBytes / weightScale)
+ .build[(ClientId, Path), Array[FileStatus]]()
+ }
+
/**
* @return a FileStatusCache that does not share any entries with any other client, but does
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 00f5d5db8f..a9511cbd9e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}
class FileIndexSuite extends SharedSQLContext {
@@ -220,6 +221,21 @@ class FileIndexSuite extends SharedSQLContext {
assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath))
}
}
+
+ test("SPARK-20280 - FileStatusCache with a partition with very many files") {
+ /* fake the size, otherwise we need to allocate 2GB of data to trigger this bug */
+ class MyFileStatus extends FileStatus with KnownSizeEstimation {
+ override def estimatedSize: Long = 1000 * 1000 * 1000
+ }
+ /* files * MyFileStatus.estimatedSize should overflow to negative integer
+ * so, make it between 2bn and 4bn
+ */
+ val files = (1 to 3).map { i =>
+ new MyFileStatus()
+ }
+ val fileStatusCache = FileStatusCache.getOrCreate(spark)
+ fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
+ }
}
class FakeParentPathFileSystem extends RawLocalFileSystem {