aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/SequenceFileRDDFunctions.scala')
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala29
1 files changed, 17 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 6b4a11d6d3..2911f9036e 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -18,6 +18,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.mapred.OutputCommitter
import org.apache.hadoop.mapred.FileOutputCommitter
+import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.Writable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.BytesWritable
@@ -36,17 +37,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
self: RDD[(K, V)])
extends Logging
with Serializable {
-
+
private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
+ if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
- // is of the form "java.lang.Object apply(java.lang.Object)"
+ // is not of the form "java.lang.Object apply(java.lang.Object)"
implicitly[T => Writable].getClass.getDeclaredMethods().filter(
- m => m.getReturnType().toString != "java.lang.Object" &&
+ m => m.getReturnType().toString != "class java.lang.Object" &&
m.getName() == "apply")(0).getReturnType
}
@@ -62,24 +63,28 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
* byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
* file system.
*/
- def saveAsSequenceFile(path: String) {
+ def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
def anyToWritable[U <% Writable](u: U): Writable = u
val keyClass = getWritableClass[K]
val valueClass = getWritableClass[V]
val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+
+ logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
+ val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
- self.saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
- self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
+ path, keyClass, valueClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
- self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
+ path, keyClass, valueClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
- self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
- }
+ self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
+ path, keyClass, valueClass, format, jobConf, codec)
+ }
}
}