From ddcb976b0d7ce4a76168da33c0e947a5a6b5a255 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 15 Feb 2013 06:54:47 +0000 Subject: Made MasterFailureTest more robust. --- .../spark/streaming/util/MasterFailureTest.scala | 26 ++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index 83d8591a3a..776e676063 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -40,6 +40,8 @@ object MasterFailureTest extends Logging { println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n") testUpdateStateByKey(directory, numBatches, batchDuration) + + println("\n\nSUCCESS\n\n") } def testMap(directory: String, numBatches: Int, batchDuration: Duration) { @@ -347,7 +349,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) override def run() { val localTestDir = Files.createTempDir() - val fs = testDir.getFileSystem(new Configuration()) + var fs = testDir.getFileSystem(new Configuration()) + val maxTries = 3 try { Thread.sleep(5000) // To make sure that all the streaming context has been set up for (i <- 0 until input.size) { @@ -355,9 +358,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val localFile = new File(localTestDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") - //fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString)) - fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) - logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) + var tries = 0 + var done = false + while (!done && tries < maxTries) { + tries += 1 + try { + fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + done = true + } catch { + case ioe: IOException => { + fs = testDir.getFileSystem(new Configuration()) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + } + } + } + if (!done) + logError("Could not generate file " + hadoopFile) + else + logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) Thread.sleep(interval) localFile.delete() } -- cgit v1.2.3