aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2011-06-23 08:11:22 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2011-06-23 08:11:22 -0700
commit3d2befe8315fc47c8bc999f3afbdad7cffd8ee60 (patch)
treef603ee3bd738b64a50738c6eaf9128fbfdfb00d0 /core
parentb5e6645505b0b88a3912f5ab10a5b6bc618c97e0 (diff)
downloadspark-3d2befe8315fc47c8bc999f3afbdad7cffd8ee60.tar.gz
spark-3d2befe8315fc47c8bc999f3afbdad7cffd8ee60.tar.bz2
spark-3d2befe8315fc47c8bc999f3afbdad7cffd8ee60.zip
Improved HadoopFileWriter (saves key and value classes to jobconf)
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/HadoopFileWriter.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/HadoopFileWriter.scala b/core/src/main/scala/spark/HadoopFileWriter.scala
index a6fd059344..c6ff83cadb 100644
--- a/core/src/main/scala/spark/HadoopFileWriter.scala
+++ b/core/src/main/scala/spark/HadoopFileWriter.scala
@@ -31,7 +31,7 @@ extends HadoopFileWriter [NullWritable, Text, TextOutputFormat[NullWritable, Tex
@serializable
class HadoopFileWriter [K, V, F <: OutputFormat[K,V], C <: OutputCommitter]
(path: String, @transient jobConf: JobConf)
- (implicit fm: ClassManifest[F], cm: ClassManifest[C]) {
+ (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F], cm: ClassManifest[C]) {
private val now = new Date()
private val conf = new SerializableWritable[JobConf](if (jobConf == null) new JobConf() else jobConf)
@@ -49,7 +49,7 @@ class HadoopFileWriter [K, V, F <: OutputFormat[K,V], C <: OutputCommitter]
@transient private var jobContext: JobContext = null
@transient private var taskContext: TaskAttemptContext = null
- def this (path: String)(implicit fm: ClassManifest[F], cm: ClassManifest[C]) = this(path, null)
+ def this (path: String)(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F], cm: ClassManifest[C]) = this(path, null)
def preSetup() {
setIDs(0, 0, 0)
@@ -78,9 +78,13 @@ class HadoopFileWriter [K, V, F <: OutputFormat[K,V], C <: OutputCommitter]
}
def write(key: K, value: V) {
- if (writer!=null)
+ if (writer!=null) {
+ val key1 = key.asInstanceOf[Any]
+ val value1 = value.asInstanceOf[Any]
+ //println ("Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
+ println ("Writing ("+key.toString+ ", " + value.toString + ")")
writer.write(key, value)
- else
+ } else
throw new IOException("Writer is null, open() has not been called")
}
@@ -154,10 +158,12 @@ class HadoopFileWriter [K, V, F <: OutputFormat[K,V], C <: OutputCommitter]
if (!confProvided) {
conf.value.setOutputFormat(fm.erasure.asInstanceOf[Class[F]])
conf.value.setOutputCommitter(cm.erasure.asInstanceOf[Class[C]])
+ conf.value.setOutputKeyClass(km.erasure)
+ conf.value.setOutputValueClass(vm.erasure)
}
FileOutputFormat.setOutputPath(conf.value, HadoopFileWriter.createPathFromString(path, conf.value))
-
+
conf.value.set("mapred.job.id", jID.value.toString);
conf.value.set("mapred.tip.id", taID.value.getTaskID.toString);
conf.value.set("mapred.task.id", taID.value.toString);