aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-25 09:47:20 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-25 09:47:20 -0800
commitc1f85fc71e71e07534b89c84677d977bb20994f8 (patch)
tree906ab2d70f3b9c70ae26222742382702c0ffccae /core/src/test/scala/org/apache
parent0a5aef753e70e93d7e56054f354a52e4d4e18932 (diff)
downloadspark-c1f85fc71e71e07534b89c84677d977bb20994f8.tar.gz
spark-c1f85fc71e71e07534b89c84677d977bb20994f8.tar.bz2
spark-c1f85fc71e71e07534b89c84677d977bb20994f8.zip
[SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.
- NettyRpcEnv::openStream() now correctly propagates errors to the read side of the pipe. - NettyStreamManager now throws if the file being transferred does not exist. - The network library now correctly handles zero-sized streams. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9941 from vanzin/SPARK-11956.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala27
1 files changed, 20 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 2b664c6313..6cc958a5f6 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -729,23 +729,36 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val tempDir = Utils.createTempDir()
val file = new File(tempDir, "file")
Files.write(UUID.randomUUID().toString(), file, UTF_8)
+ val empty = new File(tempDir, "empty")
+ Files.write("", empty, UTF_8);
val jar = new File(tempDir, "jar")
Files.write(UUID.randomUUID().toString(), jar, UTF_8)
val fileUri = env.fileServer.addFile(file)
+ val emptyUri = env.fileServer.addFile(empty)
val jarUri = env.fileServer.addJar(jar)
val destDir = Utils.createTempDir()
- val destFile = new File(destDir, file.getName())
- val destJar = new File(destDir, jar.getName())
-
val sm = new SecurityManager(conf)
val hc = SparkHadoopUtil.get.conf
- Utils.fetchFile(fileUri, destDir, conf, sm, hc, 0L, false)
- Utils.fetchFile(jarUri, destDir, conf, sm, hc, 0L, false)
- assert(Files.equal(file, destFile))
- assert(Files.equal(jar, destJar))
+ val files = Seq(
+ (file, fileUri),
+ (empty, emptyUri),
+ (jar, jarUri))
+ files.foreach { case (f, uri) =>
+ val destFile = new File(destDir, f.getName())
+ Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+ assert(Files.equal(f, destFile))
+ }
+
+ // Try to download files that do not exist.
+ Seq("files", "jars").foreach { root =>
+ intercept[Exception] {
+ val uri = env.address.toSparkURL + s"/$root/doesNotExist"
+ Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+ }
+ }
}
}