aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-24 16:39:33 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-24 16:39:33 -0700
commit26962c9340ac92b11d43e87200e699471d0b6330 (patch)
tree0122f5141df5f79036dd8e2b76cc91b47dc4b822 /core
parent6ea085169d8ba2d09ca9236273d65238b8411f04 (diff)
downloadspark-26962c9340ac92b11d43e87200e699471d0b6330.tar.gz
spark-26962c9340ac92b11d43e87200e699471d0b6330.tar.bz2
spark-26962c9340ac92b11d43e87200e699471d0b6330.zip
Automatically configure Netty port. This makes unit tests using
local-cluster pass. Previously they were failing because Netty was trying to bind to the same port for all processes. Pair programmed with @shivaram.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/spark/network/netty/FileServer.java68
-rw-r--r--core/src/main/scala/spark/network/netty/ShuffleSender.scala23
-rw-r--r--core/src/main/scala/spark/storage/BlockFetcherIterator.scala3
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala13
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala32
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala52
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala28
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala2
-rw-r--r--core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala14
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala6
10 files changed, 129 insertions, 112 deletions
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java
index 647b26bf8a..dd3f12561c 100644
--- a/core/src/main/java/spark/network/netty/FileServer.java
+++ b/core/src/main/java/spark/network/netty/FileServer.java
@@ -1,51 +1,83 @@
package spark.network.netty;
+import java.net.InetSocketAddress;
+
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelOption;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Server that accept the path of a file an echo back its content.
*/
class FileServer {
+ private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+
private ServerBootstrap bootstrap = null;
- private Channel channel = null;
- private PathResolver pResolver;
+ private ChannelFuture channelFuture = null;
+ private int port = 0;
+ private Thread blockingThread = null;
- public FileServer(PathResolver pResolver) {
- this.pResolver = pResolver;
- }
+ public FileServer(PathResolver pResolver, int port) {
+ InetSocketAddress addr = new InetSocketAddress(port);
- public void run(int port) {
// Configure the server.
bootstrap = new ServerBootstrap();
- try {
- bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
+ bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
.channel(OioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.option(ChannelOption.SO_RCVBUF, 1500)
.childHandler(new FileServerChannelInitializer(pResolver));
- // Start the server.
- channel = bootstrap.bind(port).sync().channel();
- channel.closeFuture().sync();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally{
+ // Start the server.
+ channelFuture = bootstrap.bind(addr);
+ this.port = addr.getPort();
+ }
+
+ /**
+ * Start the file server asynchronously in a new thread.
+ */
+ public void start() {
+ try {
+ blockingThread = new Thread() {
+ public void run() {
+ try {
+ Channel channel = channelFuture.sync().channel();
+ channel.closeFuture().sync();
+ } catch (InterruptedException e) {
+ LOG.error("File server start got interrupted", e);
+ }
+ }
+ };
+ blockingThread.setDaemon(true);
+ blockingThread.start();
+ } finally {
bootstrap.shutdown();
}
}
+ public int getPort() {
+ return port;
+ }
+
public void stop() {
- if (channel!=null) {
- channel.close();
+ if (blockingThread != null) {
+ blockingThread.stop();
+ blockingThread = null;
+ }
+ if (channelFuture != null) {
+ channelFuture.channel().closeFuture();
+ channelFuture = null;
}
if (bootstrap != null) {
bootstrap.shutdown();
+ bootstrap = null;
}
}
}
diff --git a/core/src/main/scala/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/spark/network/netty/ShuffleSender.scala
index dc87fefc56..d6fa4b1e80 100644
--- a/core/src/main/scala/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/spark/network/netty/ShuffleSender.scala
@@ -5,23 +5,22 @@ import java.io.File
import spark.Logging
-private[spark] class ShuffleSender(val port: Int, val pResolver: PathResolver) extends Logging {
- val server = new FileServer(pResolver)
+private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
- Runtime.getRuntime().addShutdownHook(
- new Thread() {
- override def run() {
- server.stop()
- }
- }
- )
+ val server = new FileServer(pResolver, portIn)
+ server.start()
- def start() {
- server.run(port)
+ def stop() {
+ server.stop()
}
+
+ def port: Int = server.getPort()
}
+/**
+ * An application for testing the shuffle sender as a standalone program.
+ */
private[spark] object ShuffleSender {
def main(args: Array[String]) {
@@ -50,7 +49,5 @@ private[spark] object ShuffleSender {
}
}
val sender = new ShuffleSender(port, pResovler)
-
- sender.start()
}
}
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
index 88eed0d8c8..95308c7282 100644
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
@@ -272,8 +272,7 @@ object BlockFetcherIterator {
logDebug("Sending request for %d blocks (%s) from %s".format(
req.blocks.size, Utils.memoryBytesToString(req.size), req.address.host))
- val cmId = new ConnectionManagerId(
- req.address.host, System.getProperty("spark.shuffle.sender.port", "6653").toInt)
+ val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
val cpier = new ShuffleCopier
cpier.getBlocks(cmId, req.blocks, putResult)
logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 40d608628e..d35c43f194 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -94,11 +94,16 @@ private[spark] class BlockManager(
private[storage] val diskStore: DiskStore =
new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
+ private val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
+ private val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
+ private val nettyPort = if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
+
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
val blockManagerId = BlockManagerId(
- executorId, connectionManager.id.host, connectionManager.id.port)
+ executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
@@ -266,7 +271,6 @@ private[spark] class BlockManager(
}
}
-
/**
* Get locations of an array of blocks.
*/
@@ -274,7 +278,7 @@ private[spark] class BlockManager(
val startTimeMs = System.currentTimeMillis
val locations = master.getLocations(blockIds).toArray
logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
- return locations
+ locations
}
/**
@@ -971,8 +975,7 @@ private[spark] object BlockManager extends Logging {
assert (env != null || blockManagerMaster != null)
val locationBlockIds: Seq[Seq[BlockManagerId]] =
if (env != null) {
- val blockManager = env.blockManager
- blockManager.getLocationBlockIds(blockIds)
+ env.blockManager.getLocationBlockIds(blockIds)
} else {
blockManagerMaster.getLocations(blockIds)
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
index f4a2181490..1e557d6148 100644
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -7,18 +7,19 @@ import spark.Utils
/**
* This class represent an unique identifier for a BlockManager.
* The first 2 constructors of this class is made private to ensure that
- * BlockManagerId objects can be created only using the factory method in
- * [[spark.storage.BlockManager$]]. This allows de-duplication of ID objects.
+ * BlockManagerId objects can be created only using the apply method in
+ * the companion object. This allows de-duplication of ID objects.
* Also, constructor parameters are private to ensure that parameters cannot
* be modified from outside this class.
*/
private[spark] class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
- private var port_ : Int
+ private var port_ : Int,
+ private var nettyPort_ : Int
) extends Externalizable {
- private def this() = this(null, null, 0) // For deserialization only
+ private def this() = this(null, null, 0, 0) // For deserialization only
def executorId: String = executorId_
@@ -39,28 +40,32 @@ private[spark] class BlockManagerId private (
def port: Int = port_
+ def nettyPort: Int = nettyPort_
+
override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
+ out.writeInt(nettyPort_)
}
override def readExternal(in: ObjectInput) {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
+ nettyPort_ = in.readInt()
}
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
- override def toString = "BlockManagerId(%s, %s, %d)".format(executorId, host, port)
+ override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort)
- override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port
+ override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
override def equals(that: Any) = that match {
case id: BlockManagerId =>
- executorId == id.executorId && port == id.port && host == id.host
+ executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort
case _ =>
false
}
@@ -69,8 +74,17 @@ private[spark] class BlockManagerId private (
private[spark] object BlockManagerId {
- def apply(execId: String, host: String, port: Int) =
- getCachedBlockManagerId(new BlockManagerId(execId, host, port))
+ /**
+ * Returns a [[spark.storage.BlockManagerId]] for the given configuraiton.
+ *
+ * @param execId ID of the executor.
+ * @param host Host name of the block manager.
+ * @param port Port of the block manager.
+ * @param nettyPort Optional port for the Netty-based shuffle sender.
+ * @return A new [[spark.storage.BlockManagerId]].
+ */
+ def apply(execId: String, host: String, port: Int, nettyPort: Int) =
+ getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
def apply(in: ObjectInput) = {
val obj = new BlockManagerId()
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 933eeaa216..57d4dafefc 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -82,22 +82,15 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
- var shuffleSender : Thread = null
- val thisInstance = this
+ var shuffleSender : ShuffleSender = null
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
// having really large inodes at the top level.
val localDirs = createLocalDirs()
val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
- val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
-
addShutdownHook()
- if(useNetty){
- startShuffleBlockSender()
- }
-
def getBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
new DiskBlockObjectWriter(blockId, serializer, bufferSize)
@@ -274,8 +267,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
localDirs.foreach { localDir =>
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
}
- if (useNetty && shuffleSender != null)
+ if (shuffleSender != null) {
shuffleSender.stop
+ }
} catch {
case t: Throwable => logError("Exception while deleting local spark dirs", t)
}
@@ -283,39 +277,17 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
})
}
- private def startShuffleBlockSender() {
- try {
- val port = System.getProperty("spark.shuffle.sender.port", "6653").toInt
-
- val pResolver = new PathResolver {
- override def getAbsolutePath(blockId: String): String = {
- if (!blockId.startsWith("shuffle_")) {
- return null
- }
- thisInstance.getFile(blockId).getAbsolutePath()
- }
- }
- shuffleSender = new Thread {
- override def run() = {
- val sender = new ShuffleSender(port, pResolver)
- logInfo("Created ShuffleSender binding to port : "+ port)
- sender.start
- }
- }
- shuffleSender.setDaemon(true)
- shuffleSender.start
-
- } catch {
- case interrupted: InterruptedException =>
- logInfo("Runner thread for ShuffleBlockSender interrupted")
-
- case e: Exception => {
- logError("Error running ShuffleBlockSender ", e)
- if (shuffleSender != null) {
- shuffleSender.stop
- shuffleSender = null
+ private[storage] def startShuffleBlockSender(port: Int): Int = {
+ val pResolver = new PathResolver {
+ override def getAbsolutePath(blockId: String): String = {
+ if (!blockId.startsWith("shuffle_")) {
+ return null
}
+ DiskStore.this.getFile(blockId).getAbsolutePath()
}
}
+ shuffleSender = new ShuffleSender(port, pResolver)
+ logInfo("Created ShuffleSender binding to port : "+ shuffleSender.port)
+ shuffleSender.port
}
}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index b5cedc0b68..6e585e1c3a 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -8,7 +8,7 @@ import spark.storage.BlockManagerId
import spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
-
+
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1)
@@ -45,13 +45,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
Array(compressedSize10000, compressedSize1000)))
val statuses = tracker.getServerStatuses(10, 0)
- assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
- (BlockManagerId("b", "hostB", 1000), size10000)))
+ assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000, 0), size1000),
+ (BlockManagerId("b", "hostB", 1000, 0), size10000)))
tracker.stop()
}
@@ -64,14 +64,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simulatenous fetch failures
- tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
- tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
+ tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
// The remaining reduce task might try to grab the output despite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
@@ -88,12 +88,12 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val masterTracker = new MapOutputTracker()
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker")
-
+
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker()
slaveTracker.trackerActor = slaveSystem.actorFor(
"akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
-
+
masterTracker.registerShuffle(10, 1)
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
@@ -102,13 +102,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
- Seq((BlockManagerId("a", "hostA", 1000), size1000)))
+ Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
- masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+ masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index fdee7ca384..58c834c735 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -326,5 +326,5 @@ object ShuffleSuite {
x + y
}
- class NonJavaSerializableClass(val value: Int)
+ class NonJavaSerializableClass(val value: Int) extends Serializable
}
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index 16554eac6e..30e6fef950 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -44,7 +44,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
taskSet.tasks.foreach(_.generation = mapOutputTracker.getGeneration)
- taskSets += taskSet
+ taskSets += taskSet
}
override def setListener(listener: TaskSchedulerListener) = {}
override def defaultParallelism() = 2
@@ -164,7 +164,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
}
}
}
-
+
/** Sends the rdd to the scheduler for scheduling. */
private def submit(
rdd: RDD[_],
@@ -174,7 +174,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
listener: JobListener = listener) {
runEvent(JobSubmitted(rdd, func, partitions, allowLocal, null, listener))
}
-
+
/** Sends TaskSetFailed to the scheduler. */
private def failed(taskSet: TaskSet, message: String) {
runEvent(TaskSetFailed(taskSet, message))
@@ -209,11 +209,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener))
assert(results === Map(0 -> 42))
}
-
+
test("run trivial job w/ dependency") {
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
- submit(finalRdd, Array(0))
+ submit(finalRdd, Array(0))
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@@ -250,7 +250,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
complete(taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
-
+
test("run trivial shuffle with fetch failure") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@@ -398,6 +398,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2))
private def makeBlockManagerId(host: String): BlockManagerId =
- BlockManagerId("exec-" + host, host, 12345)
+ BlockManagerId("exec-" + host, host, 12345, 0)
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 71d1f0bcc8..bff2475686 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -99,9 +99,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("BlockManagerId object caching") {
- val id1 = BlockManagerId("e1", "XXX", 1)
- val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1
- val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object
+ val id1 = BlockManagerId("e1", "XXX", 1, 0)
+ val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as id1
+ val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object
assert(id2 === id1, "id2 is not same as id1")
assert(id2.eq(id1), "id2 is not the same object as id1")
assert(id3 != id1, "id3 is same as id1")