diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 02:21:56 +0000 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 02:21:56 +0000 |
commit | 8e88db3ca56e6a56668b029e39c8e96b86d4dd5e (patch) | |
tree | 1128e7b153b788fe3fa040e434ad469f57c3cd4c | |
parent | 3d4474330d9cd7d7c1b1e9fc1f8678bc6ee905e9 (diff) | |
download | spark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.tar.gz spark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.tar.bz2 spark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.zip |
Bug fixes to the DriverRunner and minor changes here and there.
4 files changed, 14 insertions, 11 deletions
diff --git a/conf/slaves b/conf/slaves index da0a01343d..30ea300e07 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1,2 +1,5 @@ -# A Spark Worker will be started on each of the machines listed below. -localhost
\ No newline at end of file +ec2-54-221-59-252.compute-1.amazonaws.com +ec2-67-202-26-243.compute-1.amazonaws.com +ec2-23-22-220-97.compute-1.amazonaws.com +ec2-50-16-98-100.compute-1.amazonaws.com +ec2-54-234-164-206.compute-1.amazonaws.com diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 7485b89cf4..2d567b7a41 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -119,15 +119,15 @@ private[spark] class DriverRunner( val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path val jarFileSystem = jarPath.getFileSystem(emptyConf) - val destPath = new Path(driverDir.getAbsolutePath()) - val destFileSystem = destPath.getFileSystem(emptyConf) + val destPath = new File(driverDir.getAbsolutePath(), jarPath.getName()) + // val destFileSystem = destPath.getFileSystem(emptyConf) val jarFileName = jarPath.getName val localJarFile = new File(driverDir, jarFileName) val localJarFilename = localJarFile.getAbsolutePath if (!localJarFile.exists()) { // May already exist if running multiple workers on one node logInfo(s"Copying user jar $jarPath to $destPath") - FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf) + FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf) } if (!localJarFile.exists()) { // Verify copy succeeded @@ -161,7 +161,7 @@ private[spark] class DriverRunner( val stderr = new File(baseDir, "stderr") val header = "Launch Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) - Files.write(header, stderr, Charsets.UTF_8) + Files.append(header, stderr, Charsets.UTF_8) CommandUtils.redirectStream(process.get.getErrorStream, stderr) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 39e25239bf..a5a5f2e751 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -175,7 +175,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def cleanup() { } override def restore() { - hadoopFiles.foreach { + hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, f) => { // Restore the metadata in both files and generatedRDDs logInfo("Restoring files for time " + t + " - " + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 1cd0b9b0a4..6c1df4f9c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -85,14 +85,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) - logInfo("Batches during down time: " + downTimes.mkString(", ")) + logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", ")) // Batches that were unprocessed before failure - val pendingTimes = ssc.initialCheckpoint.pendingTimes - logInfo("Batches pending processing: " + pendingTimes.mkString(", ")) + val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) + logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) - logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) + logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => jobScheduler.runJobs(time, graph.generateJobs(time)) ) |