aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-04-28 22:26:45 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-04-28 22:26:45 +0530
commitafee9024430ef79cc0840a5e5788b60c8c53f9d2 (patch)
tree49f305108764dc00cba307f4f3d4a8774c3e2674 /streaming/src/main
parent1f20ef256715e5a84ba1661e235b6eda21a70b5b (diff)
downloadspark-afee9024430ef79cc0840a5e5788b60c8c53f9d2.tar.gz
spark-afee9024430ef79cc0840a5e5788b60c8c53f9d2.tar.bz2
spark-afee9024430ef79cc0840a5e5788b60c8c53f9d2.zip
Attempt to fix streaming test failures after yarn branch merge
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala30
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala8
2 files changed, 30 insertions, 8 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index e303e33e5e..7bd104b8d5 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -38,28 +38,43 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
private[streaming]
class CheckpointWriter(checkpointDir: String) extends Logging {
val file = new Path(checkpointDir, "graph")
+ // The file to which we actually write - and then "move" to file.
+ private val writeFile = new Path(file.getParent, file.getName + ".next")
+ private val bakFile = new Path(file.getParent, file.getName + ".bk")
+
+ @volatile private var stopped = false
+
val conf = new Configuration()
var fs = file.getFileSystem(conf)
val maxAttempts = 3
val executor = Executors.newFixedThreadPool(1)
+ // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
+ // I did not notice any errors - reintroduce it ?
+
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
while (attempts < maxAttempts) {
+ if (stopped) {
+ logInfo("Already stopped, ignore checkpoint attempt for " + file)
+ return
+ }
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
- if (fs.exists(file)) {
- val bkFile = new Path(file.getParent, file.getName + ".bk")
- FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
- logDebug("Moved existing checkpoint file to " + bkFile)
- }
- val fos = fs.create(file)
+ // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
+ val fos = fs.create(writeFile)
fos.write(bytes)
fos.close()
- fos.close()
+ if (fs.exists(file) && fs.rename(file, bakFile)) {
+ logDebug("Moved existing checkpoint file to " + bakFile)
+ }
+ // paranoia
+ fs.delete(file, false)
+ fs.rename(writeFile, file)
+
val finishTime = System.currentTimeMillis();
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
@@ -84,6 +99,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
}
def stop() {
+ stopped = true
executor.shutdown()
}
}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index f673e5be15..e7a3f92bc0 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -74,6 +74,7 @@ object MasterFailureTest extends Logging {
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Long], state: Option[Long]) => {
+ logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values)
Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
}
st.flatMap(_.split(" "))
@@ -159,6 +160,7 @@ object MasterFailureTest extends Logging {
// Setup the streaming computation with the given operation
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
ssc.checkpoint(checkpointDir.toString)
val inputStream = ssc.textFileStream(testDir.toString)
@@ -205,6 +207,7 @@ object MasterFailureTest extends Logging {
// (iii) Its not timed out yet
System.clearProperty("spark.streaming.clock")
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
ssc.start()
val startTime = System.currentTimeMillis()
while (!killed && !isLastOutputGenerated && !isTimedOut) {
@@ -357,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
// Write the data to a local file and then move it to the target test directory
val localFile = new File(localTestDir, (i+1).toString)
val hadoopFile = new Path(testDir, (i+1).toString)
+ val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
var done = false
while (!done && tries < maxTries) {
tries += 1
try {
- fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
+ fs.rename(tempHadoopFile, hadoopFile)
done = true
} catch {
case ioe: IOException => {