aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-15 06:54:47 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-15 06:54:47 +0000
commitddcb976b0d7ce4a76168da33c0e947a5a6b5a255 (patch)
tree82ce69ba177a64064e8b27d8c0c5f75009cdb693 /streaming
parent4b8402e900c803e64b8a4e2094fd845ccfc9df36 (diff)
downloadspark-ddcb976b0d7ce4a76168da33c0e947a5a6b5a255.tar.gz
spark-ddcb976b0d7ce4a76168da33c0e947a5a6b5a255.tar.bz2
spark-ddcb976b0d7ce4a76168da33c0e947a5a6b5a255.zip
Made MasterFailureTest more robust.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala26
1 files changed, 22 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index 83d8591a3a..776e676063 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -40,6 +40,8 @@ object MasterFailureTest extends Logging {
println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
testUpdateStateByKey(directory, numBatches, batchDuration)
+
+ println("\n\nSUCCESS\n\n")
}
def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
@@ -347,7 +349,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
override def run() {
val localTestDir = Files.createTempDir()
- val fs = testDir.getFileSystem(new Configuration())
+ var fs = testDir.getFileSystem(new Configuration())
+ val maxTries = 3
try {
Thread.sleep(5000) // To make sure that all the streaming context has been set up
for (i <- 0 until input.size) {
@@ -355,9 +358,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val localFile = new File(localTestDir, (i+1).toString)
val hadoopFile = new Path(testDir, (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
- //fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString))
- fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
- logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+ var tries = 0
+ var done = false
+ while (!done && tries < maxTries) {
+ tries += 1
+ try {
+ fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ done = true
+ } catch {
+ case ioe: IOException => {
+ fs = testDir.getFileSystem(new Configuration())
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ }
+ }
+ }
+ if (!done)
+ logError("Could not generate file " + hadoopFile)
+ else
+ logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
Thread.sleep(interval)
localFile.delete()
}