aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHarvey Feng <harvey@databricks.com>2013-10-05 16:57:08 -0700
committerHarvey Feng <harvey@databricks.com>2013-10-05 16:57:08 -0700
commitb5e93c1227f0af965f15e9455e5f4bd72680ebde (patch)
tree192f439e7c5fedbc06f03a300ccf61e8763e31c5 /core
parent7d06bdde1d1364dcbef67079b23f6e9777a2de2e (diff)
downloadspark-b5e93c1227f0af965f15e9455e5f4bd72680ebde.tar.gz
spark-b5e93c1227f0af965f15e9455e5f4bd72680ebde.tar.bz2
spark-b5e93c1227f0af965f15e9455e5f4bd72680ebde.zip
Fix API changes; lines > 100 chars.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala16
2 files changed, 18 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ada1037bd6..df32a4abe8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -84,9 +84,11 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil,
val environment: Map[String, String] = Map(),
- // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
- // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
+ // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc)
+ // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
+ // of data-local splits on host
+ val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+ scala.collection.immutable.Map())
extends Logging {
// Ensure logging is initialized before we spawn any threads
@@ -239,7 +241,8 @@ class SparkContext(
val env = SparkEnv.get
val conf = env.hadoop.newConfiguration()
// Explicitly check for S3 environment variables
- if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+ if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+ System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
@@ -332,15 +335,15 @@ class SparkContext(
* etc).
*/
def hadoopRDD[K, V](
- jobConf: JobConf,
+ conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
- SparkEnv.get.hadoop.addCredentials(jobConf)
- new HadoopRDD(this, jobConf, inputFormatClass, keyClass, valueClass, minSplits)
+ SparkEnv.get.hadoop.addCredentials(conf)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 404532dad4..728f3d1aed 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -43,18 +43,18 @@ import org.apache.hadoop.conf.{Configuration, Configurable}
class HadoopFileRDD[K, V](
sc: SparkContext,
path: String,
- confBroadcast: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
- extends HadoopRDD[K, V](sc, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) {
+ extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
override def getJobConf(): JobConf = {
if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
- val newJobConf = new JobConf(confBroadcast.value.value)
+ val newJobConf = new JobConf(broadcastedConf.value.value)
FileInputFormat.setInputPaths(newJobConf, path)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
@@ -80,7 +80,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
*/
class HadoopRDD[K, V](
sc: SparkContext,
- confBroadcast: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
@@ -89,14 +89,14 @@ class HadoopRDD[K, V](
def this(
sc: SparkContext,
- jobConf: JobConf,
+ conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int) = {
this(
sc,
- sc.broadcast(new SerializableWritable(jobConf))
+ sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
inputFormatClass,
keyClass,
@@ -110,13 +110,13 @@ class HadoopRDD[K, V](
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
- val conf: Configuration = confBroadcast.value.value
+ val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
return conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
- val newJobConf = new JobConf(confBroadcast.value.value)
+ val newJobConf = new JobConf(broadcastedConf.value.value)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}