aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAnderson de Andrade <adeandrade@verticalscope.com>2015-12-03 16:37:00 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-03 16:37:00 -0800
commitf434f36d508eb4dcade70871611fc022ae0feb56 (patch)
tree7a667a6e62d87e59bc64ecdef9dbc5dd11d9a34c /core
parent2213441e5e0fba01e05826257604aa427cdf2598 (diff)
downloadspark-f434f36d508eb4dcade70871611fc022ae0feb56.tar.gz
spark-f434f36d508eb4dcade70871611fc022ae0feb56.tar.bz2
spark-f434f36d508eb4dcade70871611fc022ae0feb56.zip
[SPARK-12056][CORE] Create a TaskAttemptContext only after calling setConf.
TaskAttemptContext's constructor will clone the configuration instead of referencing it. Calling setConf after creating TaskAttemptContext makes any changes to the configuration made inside setConf unperceived by RecordReader instances. As an example, Titan's InputFormat will change conf when calling setConf. They wrap their InputFormat around Cassandra's ColumnFamilyInputFormat, and append Cassandra's configuration. This change fixes the following error when using Titan's CassandraInputFormat with Spark: *java.lang.RuntimeException: org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_key space_args(keyspace:null)* There's a discussion of this error here: https://groups.google.com/forum/#!topic/aureliusgraphs/4zpwyrYbGAE Author: Anderson de Andrade <adeandrade@verticalscope.com> Closes #10046 from adeandrade/newhadooprdd-fix.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index d1960990da..86f38ae836 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -138,14 +138,14 @@ class NewHadoopRDD[K, V](
}
inputMetrics.setBytesReadCallback(bytesReadCallback)
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
- val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
format match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+ val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)