diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-03-20 14:00:28 +0800 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-03-20 14:00:28 +0800 |
commit | d48ee7e55ededc0ef342d35639a6f815d3699263 (patch) | |
tree | d2a3ca9b4c3f54bef0d275b162ad635837cb1cf7 /core | |
parent | 00a11304fd9db527f5cf3cb5aad8ec38d2331414 (diff) | |
parent | 945d1e720eefe5bb896fe0806abf522259e7d72d (diff) | |
download | spark-d48ee7e55ededc0ef342d35639a6f815d3699263.tar.gz spark-d48ee7e55ededc0ef342d35639a6f815d3699263.tar.bz2 spark-d48ee7e55ededc0ef342d35639a6f815d3699263.zip |
Merge branch 'master' of github.com:mesos/spark into cogroup
Diffstat (limited to 'core')
4 files changed, 20 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 2bbc931316..da3f4f636c 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -74,16 +74,10 @@ private[spark] class Worker( def connectToMaster() { logInfo("Connecting to master " + masterUrl) - try { - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing - } catch { - case e: Exception => - logError("Failed to connect to master", e) - System.exit(1) - } + master = context.actorFor(Master.toAkkaUrl(masterUrl)) + master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing } def startWebUi() { diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 0d16cf6e85..6d862c0c28 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -37,8 +37,8 @@ class CoalescedRDD[T: ClassManifest]( prevSplits.map(_.index).map{idx => new CoalescedRDDPartition(idx, prev, Array(idx)) } } else { (0 until maxPartitions).map { i => - val rangeStart = (i * prevSplits.length) / maxPartitions - val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions + val rangeStart = ((i.toLong * prevSplits.length) / maxPartitions).toInt + val rangeEnd = (((i.toLong + 1) * prevSplits.length) / maxPartitions).toInt new CoalescedRDDPartition(i, prev, (rangeStart until rangeEnd).toArray) }.toArray } diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index a6322dc58d..cbf5512e24 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -17,6 +17,7 @@ import org.apache.hadoop.util.ReflectionUtils import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} import spark.util.NextIterator +import org.apache.hadoop.conf.Configurable /** @@ -50,6 +51,9 @@ class HadoopRDD[K, V]( override def getPartitions: Array[Partition] = { val inputFormat = createInputFormat(conf) + if (inputFormat.isInstanceOf[Configurable]) { + inputFormat.asInstanceOf[Configurable].setConf(conf) + } val inputSplits = inputFormat.getSplits(conf, minSplits) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { @@ -69,6 +73,9 @@ class HadoopRDD[K, V]( val conf = confBroadcast.value.value val fmt = createInputFormat(conf) + if (fmt.isInstanceOf[Configurable]) { + fmt.asInstanceOf[Configurable].setConf(conf) + } reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index df2361025c..bdd974590a 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -3,7 +3,7 @@ package spark.rdd import java.text.SimpleDateFormat import java.util.Date -import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ @@ -42,6 +42,9 @@ class NewHadoopRDD[K, V]( override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance + if (inputFormat.isInstanceOf[Configurable]) { + inputFormat.asInstanceOf[Configurable].setConf(conf) + } val jobContext = newJobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) @@ -57,6 +60,9 @@ class NewHadoopRDD[K, V]( val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance + if (format.isInstanceOf[Configurable]) { + format.asInstanceOf[Configurable].setConf(conf) + } val reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) |