diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-15 06:54:47 +0000 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-15 06:54:47 +0000 |
commit | ddcb976b0d7ce4a76168da33c0e947a5a6b5a255 (patch) | |
tree | 82ce69ba177a64064e8b27d8c0c5f75009cdb693 | |
parent | 4b8402e900c803e64b8a4e2094fd845ccfc9df36 (diff) | |
download | spark-ddcb976b0d7ce4a76168da33c0e947a5a6b5a255.tar.gz spark-ddcb976b0d7ce4a76168da33c0e947a5a6b5a255.tar.bz2 spark-ddcb976b0d7ce4a76168da33c0e947a5a6b5a255.zip |
Made MasterFailureTest more robust.
-rw-r--r-- | streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index 83d8591a3a..776e676063 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -40,6 +40,8 @@ object MasterFailureTest extends Logging { println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n") testUpdateStateByKey(directory, numBatches, batchDuration) + + println("\n\nSUCCESS\n\n") } def testMap(directory: String, numBatches: Int, batchDuration: Duration) { @@ -347,7 +349,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) override def run() { val localTestDir = Files.createTempDir() - val fs = testDir.getFileSystem(new Configuration()) + var fs = testDir.getFileSystem(new Configuration()) + val maxTries = 3 try { Thread.sleep(5000) // To make sure that all the streaming context has been set up for (i <- 0 until input.size) { @@ -355,9 +358,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val localFile = new File(localTestDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") - //fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString)) - fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) - logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) + var tries = 0 + var done = false + while (!done && tries < maxTries) { + tries += 1 + try { + fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + done = true + } catch { + case ioe: IOException => { + fs = testDir.getFileSystem(new Configuration()) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + } + } + } + if (!done) + logError("Could not generate file " + hadoopFile) + else + logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) Thread.sleep(interval) localFile.delete() } |