aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-06-13 14:46:25 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-06-13 14:46:25 -0700
commit1d9f0df0652f455145d2dfed43a9407df6de6c43 (patch)
treea7c5c67de82617c632fa2c0d10a2916df49b0044 /core/src
parent5da4287b1dbfb8cfcec9c915926d8a8755bd52b2 (diff)
downloadspark-1d9f0df0652f455145d2dfed43a9407df6de6c43.tar.gz
spark-1d9f0df0652f455145d2dfed43a9407df6de6c43.tar.bz2
spark-1d9f0df0652f455145d2dfed43a9407df6de6c43.zip
Fix some comments and style
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/spark/network/netty/FileClient.java2
-rw-r--r--core/src/main/scala/spark/network/netty/ShuffleCopier.scala8
-rw-r--r--core/src/main/scala/spark/storage/BlockFetcherIterator.scala6
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala3
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala3
5 files changed, 8 insertions, 14 deletions
diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java
index 517772202f..a4bb4bc701 100644
--- a/core/src/main/java/spark/network/netty/FileClient.java
+++ b/core/src/main/java/spark/network/netty/FileClient.java
@@ -30,7 +30,7 @@ class FileClient {
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) // Disable connect timeout
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.handler(new FileClientChannelInitializer(handler));
}
diff --git a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
index afb2cdbb3a..8d5194a737 100644
--- a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
@@ -18,8 +18,9 @@ private[spark] class ShuffleCopier extends Logging {
resultCollectCallback: (String, Long, ByteBuf) => Unit) {
val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
- val fc = new FileClient(handler,
- System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt)
+ val connectTimeout = System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt
+ val fc = new FileClient(handler, connectTimeout)
+
try {
fc.init()
fc.connect(host, port)
@@ -29,8 +30,7 @@ private[spark] class ShuffleCopier extends Logging {
} catch {
// Handle any socket-related exceptions in FileClient
case e: Exception => {
- logError("Shuffle copy of block " + blockId + " from " + host + ":" + port +
- " failed", e)
+ logError("Shuffle copy of block " + blockId + " from " + host + ":" + port + " failed", e)
handler.handleError(blockId)
}
}
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
index bb78207c9f..bec876213e 100644
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
@@ -322,11 +322,7 @@ object BlockFetcherIterator {
override def next(): (String, Option[Iterator[Any]]) = {
resultsGotten += 1
val result = results.take()
- // if all the results has been retrieved, shutdown the copiers
- // NO need to stop the copiers if we got all the blocks ?
- // if (resultsGotten == _numBlocksToFetch && copiers != null) {
- // stopCopiers()
- // }
+ // If all the results has been retrieved, copiers will exit automatically
(result.blockId, if (result.failed) None else Some(result.deserialize()))
}
}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 0af6e4a359..15ab840155 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -212,10 +212,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val file = getFile(blockId)
if (!allowAppendExisting && file.exists()) {
// NOTE(shivaram): Delete the file if it exists. This might happen if a ShuffleMap task
- // was rescheduled on the same machine as the old task ?
+ // was rescheduled on the same machine as the old task.
logWarning("File for block " + blockId + " already exists on disk: " + file + ". Deleting")
file.delete()
- // throw new Exception("File for block " + blockId + " already exists on disk: " + file)
}
file
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 33b02fff80..1916885a73 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -376,8 +376,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val a = sc.parallelize(1 to 4, NUM_BLOCKS)
val b = a.map(x => (x, x*2))
- // NOTE: The default Java serializer doesn't create zero-sized blocks.
- // So, use Kryo
+ // NOTE: The default Java serializer should create zero-sized blocks
val c = new ShuffledRDD(b, new HashPartitioner(10))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId