From 1e5c51f336b90cd1eed43e9c6cf00faee696174c Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 18 Dec 2016 09:08:02 +0000 Subject: [SPARK-18827][CORE] Fix cannot read broadcast on disk ## What changes were proposed in this pull request? `NoSuchElementException` will throw since https://github.com/apache/spark/pull/15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function. This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it more robust. We can cache and read broadcast even it cannot fit in memory from this pull request. Exception log: ``` 16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes) 16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory. 16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far) 16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB. 16/12/10 10:10:04 ERROR Utils: Exception encountered java.util.NoSuchElementException at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58) at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210) at scala.Option.map(Option.scala:146) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423) java.io.IOException: java.util.NoSuchElementException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.NoSuchElementException at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58) at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210) at scala.Option.map(Option.scala:146) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) ... 12 more ``` ## How was this patch tested? Add unit test Author: Yuming Wang Closes #16252 from wangyum/SPARK-18827. --- .../scala/org/apache/spark/broadcast/BroadcastSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'core/src/test') diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 973676398a..6646068d50 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -137,6 +137,18 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("Cache broadcast to disk") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.memory.useLegacyMode", "true") + .set("spark.storage.memoryFraction", "0.0") + sc = new SparkContext(conf) + val list = List[Int](1, 2, 3, 4) + val broadcast = sc.broadcast(list) + assert(broadcast.value.sum === 10) + } + /** * Verify the persistence of state associated with a TorrentBroadcast in a local-cluster. * -- cgit v1.2.3