aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-06 02:21:56 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-06 02:21:56 +0000
commit8e88db3ca56e6a56668b029e39c8e96b86d4dd5e (patch)
tree1128e7b153b788fe3fa040e434ad469f57c3cd4c
parent3d4474330d9cd7d7c1b1e9fc1f8678bc6ee905e9 (diff)
downloadspark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.tar.gz
spark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.tar.bz2
spark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.zip
Bug fixes to the DriverRunner and minor changes here and there.
-rw-r--r--conf/slaves7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala8
4 files changed, 14 insertions, 11 deletions
diff --git a/conf/slaves b/conf/slaves
index da0a01343d..30ea300e07 100644
--- a/conf/slaves
+++ b/conf/slaves
@@ -1,2 +1,5 @@
-# A Spark Worker will be started on each of the machines listed below.
-localhost \ No newline at end of file
+ec2-54-221-59-252.compute-1.amazonaws.com
+ec2-67-202-26-243.compute-1.amazonaws.com
+ec2-23-22-220-97.compute-1.amazonaws.com
+ec2-50-16-98-100.compute-1.amazonaws.com
+ec2-54-234-164-206.compute-1.amazonaws.com
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 7485b89cf4..2d567b7a41 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -119,15 +119,15 @@ private[spark] class DriverRunner(
val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
val jarFileSystem = jarPath.getFileSystem(emptyConf)
- val destPath = new Path(driverDir.getAbsolutePath())
- val destFileSystem = destPath.getFileSystem(emptyConf)
+ val destPath = new File(driverDir.getAbsolutePath(), jarPath.getName())
+ // val destFileSystem = destPath.getFileSystem(emptyConf)
val jarFileName = jarPath.getName
val localJarFile = new File(driverDir, jarFileName)
val localJarFilename = localJarFile.getAbsolutePath
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
- FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
+ FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
}
if (!localJarFile.exists()) { // Verify copy succeeded
@@ -161,7 +161,7 @@ private[spark] class DriverRunner(
val stderr = new File(baseDir, "stderr")
val header = "Launch Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
- Files.write(header, stderr, Charsets.UTF_8)
+ Files.append(header, stderr, Charsets.UTF_8)
CommandUtils.redirectStream(process.get.getErrorStream, stderr)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 39e25239bf..a5a5f2e751 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -175,7 +175,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
override def cleanup() { }
override def restore() {
- hadoopFiles.foreach {
+ hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach {
case (t, f) => {
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 1cd0b9b0a4..6c1df4f9c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -85,14 +85,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
- logInfo("Batches during down time: " + downTimes.mkString(", "))
+ logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", "))
// Batches that were unprocessed before failure
- val pendingTimes = ssc.initialCheckpoint.pendingTimes
- logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
+ val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
+ logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
- logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
+ logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
jobScheduler.runJobs(time, graph.generateJobs(time))
)