aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala14
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala7
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala8
4 files changed, 20 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 2bbc931316..da3f4f636c 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -74,16 +74,10 @@ private[spark] class Worker(
def connectToMaster() {
logInfo("Connecting to master " + masterUrl)
- try {
- master = context.actorFor(Master.toAkkaUrl(masterUrl))
- master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
- } catch {
- case e: Exception =>
- logError("Failed to connect to master", e)
- System.exit(1)
- }
+ master = context.actorFor(Master.toAkkaUrl(masterUrl))
+ master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
}
def startWebUi() {
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 0d16cf6e85..6d862c0c28 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -37,8 +37,8 @@ class CoalescedRDD[T: ClassManifest](
prevSplits.map(_.index).map{idx => new CoalescedRDDPartition(idx, prev, Array(idx)) }
} else {
(0 until maxPartitions).map { i =>
- val rangeStart = (i * prevSplits.length) / maxPartitions
- val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions
+ val rangeStart = ((i.toLong * prevSplits.length) / maxPartitions).toInt
+ val rangeEnd = (((i.toLong + 1) * prevSplits.length) / maxPartitions).toInt
new CoalescedRDDPartition(i, prev, (rangeStart until rangeEnd).toArray)
}.toArray
}
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index a6322dc58d..cbf5512e24 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.util.ReflectionUtils
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
import spark.util.NextIterator
+import org.apache.hadoop.conf.Configurable
/**
@@ -50,6 +51,9 @@ class HadoopRDD[K, V](
override def getPartitions: Array[Partition] = {
val inputFormat = createInputFormat(conf)
+ if (inputFormat.isInstanceOf[Configurable]) {
+ inputFormat.asInstanceOf[Configurable].setConf(conf)
+ }
val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
@@ -69,6 +73,9 @@ class HadoopRDD[K, V](
val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
+ if (fmt.isInstanceOf[Configurable]) {
+ fmt.asInstanceOf[Configurable].setConf(conf)
+ }
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index df2361025c..bdd974590a 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -3,7 +3,7 @@ package spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
-import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
@@ -42,6 +42,9 @@ class NewHadoopRDD[K, V](
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
+ if (inputFormat.isInstanceOf[Configurable]) {
+ inputFormat.asInstanceOf[Configurable].setConf(conf)
+ }
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
@@ -57,6 +60,9 @@ class NewHadoopRDD[K, V](
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
+ if (format.isInstanceOf[Configurable]) {
+ format.asInstanceOf[Configurable].setConf(conf)
+ }
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)