From 1d9f0df0652f455145d2dfed43a9407df6de6c43 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 13 Jun 2013 14:46:25 -0700 Subject: Fix some comments and style --- core/src/main/java/spark/network/netty/FileClient.java | 2 +- core/src/main/scala/spark/network/netty/ShuffleCopier.scala | 8 ++++---- core/src/main/scala/spark/storage/BlockFetcherIterator.scala | 6 +----- core/src/main/scala/spark/storage/DiskStore.scala | 3 +-- core/src/test/scala/spark/ShuffleSuite.scala | 3 +-- 5 files changed, 8 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java index 517772202f..a4bb4bc701 100644 --- a/core/src/main/java/spark/network/netty/FileClient.java +++ b/core/src/main/java/spark/network/netty/FileClient.java @@ -30,7 +30,7 @@ class FileClient { .channel(OioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) // Disable connect timeout + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) .handler(new FileClientChannelInitializer(handler)); } diff --git a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala index afb2cdbb3a..8d5194a737 100644 --- a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala @@ -18,8 +18,9 @@ private[spark] class ShuffleCopier extends Logging { resultCollectCallback: (String, Long, ByteBuf) => Unit) { val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback) - val fc = new FileClient(handler, - System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt) + val connectTimeout = System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt + val fc = new FileClient(handler, connectTimeout) + try { fc.init() fc.connect(host, port) @@ -29,8 +30,7 @@ private[spark] class ShuffleCopier extends Logging { } catch { // Handle any socket-related exceptions in FileClient case e: Exception => { - logError("Shuffle copy of block " + blockId + " from " + host + ":" + port + - " failed", e) + logError("Shuffle copy of block " + blockId + " from " + host + ":" + port + " failed", e) handler.handleError(blockId) } } diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala index bb78207c9f..bec876213e 100644 --- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala @@ -322,11 +322,7 @@ object BlockFetcherIterator { override def next(): (String, Option[Iterator[Any]]) = { resultsGotten += 1 val result = results.take() - // if all the results has been retrieved, shutdown the copiers - // NO need to stop the copiers if we got all the blocks ? - // if (resultsGotten == _numBlocksToFetch && copiers != null) { - // stopCopiers() - // } + // If all the results has been retrieved, copiers will exit automatically (result.blockId, if (result.failed) None else Some(result.deserialize())) } } diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 0af6e4a359..15ab840155 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -212,10 +212,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val file = getFile(blockId) if (!allowAppendExisting && file.exists()) { // NOTE(shivaram): Delete the file if it exists. This might happen if a ShuffleMap task - // was rescheduled on the same machine as the old task ? + // was rescheduled on the same machine as the old task. logWarning("File for block " + blockId + " already exists on disk: " + file + ". Deleting") file.delete() - // throw new Exception("File for block " + blockId + " already exists on disk: " + file) } file } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 33b02fff80..1916885a73 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -376,8 +376,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { val a = sc.parallelize(1 to 4, NUM_BLOCKS) val b = a.map(x => (x, x*2)) - // NOTE: The default Java serializer doesn't create zero-sized blocks. - // So, use Kryo + // NOTE: The default Java serializer should create zero-sized blocks val c = new ShuffledRDD(b, new HashPartitioner(10)) val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId -- cgit v1.2.3