From 60e18ce7dd1016647b63586520b713efc45494a8 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 4 Apr 2014 17:29:29 -0700 Subject: SPARK-1414. Python API for SparkContext.wholeTextFiles Also clarified comment on each file having to fit in memory Author: Matei Zaharia Closes #327 from mateiz/py-whole-files and squashes the following commits: 9ad64a5 [Matei Zaharia] SPARK-1414. Python API for SparkContext.wholeTextFiles --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala | 2 +- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 6 ++++-- 3 files changed, 6 insertions(+), 4 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 28a865c0ad..835cffe37a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -395,7 +395,7 @@ class SparkContext( * (a-hdfs-path/part-nnnnn, its content) * }}} * - * @note Small files are perferred, large file is also allowable, but may cause bad performance. + * @note Small files are preferred, as each file will be loaded fully in memory. */ def wholeTextFiles(path: String): RDD[(String, String)] = { newAPIHadoopFile( diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 6cbdeac58d..a2855d4db1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * (a-hdfs-path/part-nnnnn, its content) * }}} * - * @note Small files are perferred, large file is also allowable, but may cause bad performance. + * @note Small files are preferred, as each file will be loaded fully in memory. */ def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b67286a4e3..32f1100406 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.api.python import java.io._ import java.net._ +import java.nio.charset.Charset import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} import scala.collection.JavaConversions._ @@ -206,6 +207,7 @@ private object SpecialLengths { } private[spark] object PythonRDD { + val UTF8 = Charset.forName("UTF-8") def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): JavaRDD[Array[Byte]] = { @@ -266,7 +268,7 @@ private[spark] object PythonRDD { } def writeUTF(str: String, dataOut: DataOutputStream) { - val bytes = str.getBytes("UTF-8") + val bytes = str.getBytes(UTF8) dataOut.writeInt(bytes.length) dataOut.write(bytes) } @@ -286,7 +288,7 @@ private[spark] object PythonRDD { private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] { - override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8") + override def call(arr: Array[Byte]) : String = new String(arr, PythonRDD.UTF8) } /** -- cgit v1.2.3