aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala47
1 files changed, 34 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index 5d97558633..aea27bd4c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -94,27 +94,48 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
// Opaque object that uniquely identifies a shared cache user
private type ClientId = Object
+
private val warnedAboutEviction = new AtomicBoolean(false)
// we use a composite cache key in order to distinguish entries inserted by different clients
- private val cache: Cache[(ClientId, Path), Array[FileStatus]] = CacheBuilder.newBuilder()
- .weigher(new Weigher[(ClientId, Path), Array[FileStatus]] {
+ private val cache: Cache[(ClientId, Path), Array[FileStatus]] = {
+ // [[Weigher]].weigh returns Int so we could only cache objects < 2GB
+ // instead, the weight is divided by this factor (which is smaller
+ // than the size of one [[FileStatus]]).
+ // so it will support objects up to 64GB in size.
+ val weightScale = 32
+ val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] {
override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = {
- (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
- }})
- .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
- override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
- : Unit = {
+ val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale
+ if (estimate > Int.MaxValue) {
+ logWarning(s"Cached table partition metadata size is too big. Approximating to " +
+ s"${Int.MaxValue.toLong * weightScale}.")
+ Int.MaxValue
+ } else {
+ estimate.toInt
+ }
+ }
+ }
+ val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
+ override def onRemoval(
+ removed: RemovalNotification[(ClientId, Path),
+ Array[FileStatus]]): Unit = {
if (removed.getCause == RemovalCause.SIZE &&
- warnedAboutEviction.compareAndSet(false, true)) {
+ warnedAboutEviction.compareAndSet(false, true)) {
logWarning(
"Evicting cached table partition metadata from memory due to size constraints " +
- "(spark.sql.hive.filesourcePartitionFileCacheSize = " + maxSizeInBytes + " bytes). " +
- "This may impact query planning performance.")
+ "(spark.sql.hive.filesourcePartitionFileCacheSize = "
+ + maxSizeInBytes + " bytes). This may impact query planning performance.")
}
- }})
- .maximumWeight(maxSizeInBytes)
- .build[(ClientId, Path), Array[FileStatus]]()
+ }
+ }
+ CacheBuilder.newBuilder()
+ .weigher(weigher)
+ .removalListener(removalListener)
+ .maximumWeight(maxSizeInBytes / weightScale)
+ .build[(ClientId, Path), Array[FileStatus]]()
+ }
+
/**
* @return a FileStatusCache that does not share any entries with any other client, but does