aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-06 00:22:19 -0800
committerMatei Zaharia <matei@databricks.com>2014-11-06 00:22:32 -0800
commit01484455c4ee4ee8e848be56f395d38841fbf86a (patch)
tree23123661a0bd3ac4e22132a353c62254b44d44c6 /python/pyspark
parent2c84178b8283269512b1c968b9995a7bdedd7aa5 (diff)
downloadspark-01484455c4ee4ee8e848be56f395d38841fbf86a.tar.gz
spark-01484455c4ee4ee8e848be56f395d38841fbf86a.tar.bz2
spark-01484455c4ee4ee8e848be56f395d38841fbf86a.zip
[SPARK-4186] add binaryFiles and binaryRecords in Python
add binaryFiles() and binaryRecords() in Python ``` binaryFiles(self, path, minPartitions=None): :: Developer API :: Read a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. Note: Small files are preferred, large file is also allowable, but may cause bad performance. binaryRecords(self, path, recordLength): Load data from a flat binary file, assuming each record is a set of numbers with the specified numerical format (see ByteBuffer), and the number of bytes per record is constant. :param path: Directory to the input data files :param recordLength: The length at which to split the records ``` Author: Davies Liu <davies@databricks.com> Closes #3078 from davies/binary and squashes the following commits: cd0bdbd [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary 3aa349b [Davies Liu] add experimental notes 24e84b6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary 5ceaa8a [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary 1900085 [Davies Liu] bugfix bb22442 [Davies Liu] add binaryFiles and binaryRecords in Python (cherry picked from commit b41a39e24038876359aeb7ce2bbbb4de2234e5f3) Signed-off-by: Matei Zaharia <matei@databricks.com>
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/context.py32
-rw-r--r--python/pyspark/tests.py19
2 files changed, 50 insertions, 1 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a0e4821728..faa5952258 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -29,7 +29,7 @@ from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
- PairDeserializer, CompressedSerializer, AutoBatchedSerializer
+ PairDeserializer, CompressedSerializer, AutoBatchedSerializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
@@ -388,6 +388,36 @@ class SparkContext(object):
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))
+ def binaryFiles(self, path, minPartitions=None):
+ """
+ :: Experimental ::
+
+ Read a directory of binary files from HDFS, a local file system
+ (available on all nodes), or any Hadoop-supported file system URI
+ as a byte array. Each file is read as a single record and returned
+ in a key-value pair, where the key is the path of each file, the
+ value is the content of each file.
+
+ Note: Small files are preferred, large file is also allowable, but
+ may cause bad performance.
+ """
+ minPartitions = minPartitions or self.defaultMinPartitions
+ return RDD(self._jsc.binaryFiles(path, minPartitions), self,
+ PairDeserializer(UTF8Deserializer(), NoOpSerializer()))
+
+ def binaryRecords(self, path, recordLength):
+ """
+ :: Experimental ::
+
+ Load data from a flat binary file, assuming each record is a set of numbers
+ with the specified numerical format (see ByteBuffer), and the number of
+ bytes per record is constant.
+
+ :param path: Directory to the input data files
+ :param recordLength: The length at which to split the records
+ """
+ return RDD(self._jsc.binaryRecords(path, recordLength), self, NoOpSerializer())
+
def _dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
if not d:
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 7e61b017ef..9f625c5c6c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1110,6 +1110,25 @@ class InputFormatTests(ReusedPySparkTestCase):
(u'\x03', [2.0])]
self.assertEqual(maps, em)
+ def test_binary_files(self):
+ path = os.path.join(self.tempdir.name, "binaryfiles")
+ os.mkdir(path)
+ data = "short binary data"
+ with open(os.path.join(path, "part-0000"), 'w') as f:
+ f.write(data)
+ [(p, d)] = self.sc.binaryFiles(path).collect()
+ self.assertTrue(p.endswith("part-0000"))
+ self.assertEqual(d, data)
+
+ def test_binary_records(self):
+ path = os.path.join(self.tempdir.name, "binaryrecords")
+ os.mkdir(path)
+ with open(os.path.join(path, "part-0000"), 'w') as f:
+ for i in range(100):
+ f.write('%04d' % i)
+ result = self.sc.binaryRecords(path, 4).map(int).collect()
+ self.assertEqual(range(100), result)
+
class OutputFormatTests(ReusedPySparkTestCase):