aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-25 09:47:20 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-25 09:47:20 -0800
commitc1f85fc71e71e07534b89c84677d977bb20994f8 (patch)
tree906ab2d70f3b9c70ae26222742382702c0ffccae /core
parent0a5aef753e70e93d7e56054f354a52e4d4e18932 (diff)
downloadspark-c1f85fc71e71e07534b89c84677d977bb20994f8.tar.gz
spark-c1f85fc71e71e07534b89c84677d977bb20994f8.tar.bz2
spark-c1f85fc71e71e07534b89c84677d977bb20994f8.zip
[SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.
- NettyRpcEnv::openStream() now correctly propagates errors to the read side of the pipe. - NettyStreamManager now throws if the file being transferred does not exist. - The network library now correctly handles zero-sized streams. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9941 from vanzin/SPARK-11956.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala27
3 files changed, 35 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 68701f609f..c8fa870f50 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -27,7 +27,7 @@ import javax.annotation.Nullable
import scala.concurrent.{Future, Promise}
import scala.reflect.ClassTag
-import scala.util.{DynamicVariable, Failure, Success}
+import scala.util.{DynamicVariable, Failure, Success, Try}
import scala.util.control.NonFatal
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -368,13 +368,22 @@ private[netty] class NettyRpcEnv(
@volatile private var error: Throwable = _
- def setError(e: Throwable): Unit = error = e
+ def setError(e: Throwable): Unit = {
+ error = e
+ source.close()
+ }
override def read(dst: ByteBuffer): Int = {
- if (error != null) {
- throw error
+ val result = if (error == null) {
+ Try(source.read(dst))
+ } else {
+ Failure(error)
+ }
+
+ result match {
+ case Success(bytesRead) => bytesRead
+ case Failure(error) => throw error
}
- source.read(dst)
}
override def close(): Unit = source.close()
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index eb1d2604fb..a2768b4252 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -44,7 +44,7 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
case _ => throw new IllegalArgumentException(s"Invalid file type: $ftype")
}
- require(file != null, s"File not found: $streamId")
+ require(file != null && file.isFile(), s"File not found: $streamId")
new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 2b664c6313..6cc958a5f6 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -729,23 +729,36 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val tempDir = Utils.createTempDir()
val file = new File(tempDir, "file")
Files.write(UUID.randomUUID().toString(), file, UTF_8)
+ val empty = new File(tempDir, "empty")
+ Files.write("", empty, UTF_8);
val jar = new File(tempDir, "jar")
Files.write(UUID.randomUUID().toString(), jar, UTF_8)
val fileUri = env.fileServer.addFile(file)
+ val emptyUri = env.fileServer.addFile(empty)
val jarUri = env.fileServer.addJar(jar)
val destDir = Utils.createTempDir()
- val destFile = new File(destDir, file.getName())
- val destJar = new File(destDir, jar.getName())
-
val sm = new SecurityManager(conf)
val hc = SparkHadoopUtil.get.conf
- Utils.fetchFile(fileUri, destDir, conf, sm, hc, 0L, false)
- Utils.fetchFile(jarUri, destDir, conf, sm, hc, 0L, false)
- assert(Files.equal(file, destFile))
- assert(Files.equal(jar, destJar))
+ val files = Seq(
+ (file, fileUri),
+ (empty, emptyUri),
+ (jar, jarUri))
+ files.foreach { case (f, uri) =>
+ val destFile = new File(destDir, f.getName())
+ Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+ assert(Files.equal(f, destFile))
+ }
+
+ // Try to download files that do not exist.
+ Seq("files", "jars").foreach { root =>
+ intercept[Exception] {
+ val uri = env.address.toSparkURL + s"/$root/doesNotExist"
+ Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+ }
+ }
}
}