aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-07-30 13:19:05 -0700
committerJosh Rosen <joshrosen@apache.org>2014-07-30 13:19:05 -0700
commit94d1f46fc43c0cb85125f757fb40db9271caf1f4 (patch)
tree8878443a963ad6ce5ba3af679567d893c8df70cc /python
parent437dc8c5b54f0dcf9564c1fb07e8dce9e771c8cd (diff)
downloadspark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.tar.gz
spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.tar.bz2
spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.zip
[SPARK-2024] Add saveAsSequenceFile to PySpark
JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024 This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats. * Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs. * Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types. * No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples. * Added HBase and Cassandra output examples to show how custom output formats and converters can be used. cc MLnick mateiz ahirreddy pwendell Author: Kan Zhang <kzhang@apache.org> Closes #1338 from kanzhang/SPARK-2024 and squashes the following commits: c01e3ef [Kan Zhang] [SPARK-2024] code formatting 6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10 57a7a5e [Kan Zhang] [SPARK-2024] correcting typo 75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD 0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests 9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests 0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases 7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py51
-rw-r--r--python/pyspark/rdd.py114
-rw-r--r--python/pyspark/tests.py317
3 files changed, 454 insertions, 28 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 830a6ee03f..7b0f8d83ae 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -60,6 +60,7 @@ class SparkContext(object):
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
+ _default_batch_size_for_serialized_input = 10
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
@@ -378,7 +379,7 @@ class SparkContext(object):
return jm
def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
- valueConverter=None, minSplits=None):
+ valueConverter=None, minSplits=None, batchSize=None):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -398,14 +399,18 @@ class SparkContext(object):
@param valueConverter:
@param minSplits: minimum splits in dataset
(default min(2, sc.defaultParallelism))
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
- keyConverter, valueConverter, minSplits)
- return RDD(jrdd, self, PickleSerializer())
+ keyConverter, valueConverter, minSplits, batchSize)
+ return RDD(jrdd, self, ser)
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -425,14 +430,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -449,14 +458,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -476,14 +489,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -500,11 +517,15 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
- keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b84d976114..e8fcc900ef 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -231,6 +231,13 @@ class RDD(object):
self._jrdd_deserializer = jrdd_deserializer
self._id = jrdd.id()
+ def _toPickleSerialization(self):
+ if (self._jrdd_deserializer == PickleSerializer() or
+ self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+ return self
+ else:
+ return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+
def id(self):
"""
A unique ID for this RDD (within its SparkContext).
@@ -1030,6 +1037,113 @@ class RDD(object):
"""
return self.take(1)[0]
+ def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
+ converted for output using either user specified converters or, by default,
+ L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+ @param conf: Hadoop job configuration, passed in as a dict
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ keyConverter, valueConverter, True)
+
+ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+ keyConverter=None, valueConverter=None, conf=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
+ will be inferred if not specified. Keys and values are converted for output using either
+ user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+ C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+ of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+ @param path: path to Hadoop file
+ @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+ (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.Text", None by default)
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop job configuration, passed in as a dict (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+ outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
+
+ def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
+ converted for output using either user specified converters or, by default,
+ L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+ @param conf: Hadoop job configuration, passed in as a dict
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ keyConverter, valueConverter, False)
+
+ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+ keyConverter=None, valueConverter=None, conf=None,
+ compressionCodecClass=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the old Hadoop OutputFormat API (mapred package). Key and value types
+ will be inferred if not specified. Keys and values are converted for output using either
+ user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+ C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+ of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+ @param path: path to Hadoop file
+ @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+ (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.Text", None by default)
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: (None by default)
+ @param compressionCodecClass: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+ outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
+ jconf, compressionCodecClass)
+
+ def saveAsSequenceFile(self, path, compressionCodecClass=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
+ RDD's key and value types. The mechanism is as follows:
+ 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
+ 2. Keys and values of this Java RDD are converted to Writables and written out.
+
+ @param path: path to sequence file
+ @param compressionCodecClass: (None by default)
+ """
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+ path, compressionCodecClass)
+
def saveAsPickleFile(self, path, batchSize=10):
"""
Save this RDD as a SequenceFile of serialized objects. The serializer
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8486c8595b..c29deb9574 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -19,6 +19,7 @@
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
"""
+from array import array
from fileinput import input
from glob import glob
import os
@@ -327,6 +328,17 @@ class TestInputFormat(PySparkTestCase):
ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
self.assertEqual(doubles, ed)
+ bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.BytesWritable").collect())
+ ebs = [(1, bytearray('aa', 'utf-8')),
+ (1, bytearray('aa', 'utf-8')),
+ (2, bytearray('aa', 'utf-8')),
+ (2, bytearray('bb', 'utf-8')),
+ (2, bytearray('bb', 'utf-8')),
+ (3, bytearray('cc', 'utf-8'))]
+ self.assertEqual(bytes, ebs)
+
text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text").collect())
@@ -353,14 +365,34 @@ class TestInputFormat(PySparkTestCase):
maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable").collect())
- em = [(1, {2.0: u'aa'}),
+ em = [(1, {}),
(1, {3.0: u'bb'}),
(2, {1.0: u'aa'}),
(2, {1.0: u'cc'}),
- (2, {3.0: u'bb'}),
(3, {2.0: u'dd'})]
self.assertEqual(maps, em)
+ # arrays get pickled to tuples by default
+ tuples = sorted(self.sc.sequenceFile(
+ basepath + "/sftestdata/sfarray/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable").collect())
+ et = [(1, ()),
+ (2, (3.0, 4.0, 5.0)),
+ (3, (4.0, 5.0, 6.0))]
+ self.assertEqual(tuples, et)
+
+ # with custom converters, primitive arrays can stay as arrays
+ arrays = sorted(self.sc.sequenceFile(
+ basepath + "/sftestdata/sfarray/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+ ea = [(1, array('d')),
+ (2, array('d', [3.0, 4.0, 5.0])),
+ (3, array('d', [4.0, 5.0, 6.0]))]
+ self.assertEqual(arrays, ea)
+
clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable").collect())
@@ -369,6 +401,12 @@ class TestInputFormat(PySparkTestCase):
u'double': 54.0, u'int': 123, u'str': u'test1'})
self.assertEqual(clazz[0], ec)
+ unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
+ "org.apache.hadoop.io.Text",
+ "org.apache.spark.api.python.TestWritable",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_clazz[0], ec)
+
def test_oldhadoop(self):
basepath = self.tempdir.name
ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
@@ -379,10 +417,11 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- hello = self.sc.hadoopFile(hellopath,
- "org.apache.hadoop.mapred.TextInputFormat",
- "org.apache.hadoop.io.LongWritable",
- "org.apache.hadoop.io.Text").collect()
+ oldconf = {"mapred.input.dir" : hellopath}
+ hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
+ "org.apache.hadoop.io.LongWritable",
+ "org.apache.hadoop.io.Text",
+ conf=oldconf).collect()
result = [(0, u'Hello World!')]
self.assertEqual(hello, result)
@@ -397,10 +436,11 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- hello = self.sc.newAPIHadoopFile(hellopath,
- "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
- "org.apache.hadoop.io.LongWritable",
- "org.apache.hadoop.io.Text").collect()
+ newconf = {"mapred.input.dir" : hellopath}
+ hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
+ "org.apache.hadoop.io.LongWritable",
+ "org.apache.hadoop.io.Text",
+ conf=newconf).collect()
result = [(0, u'Hello World!')]
self.assertEqual(hello, result)
@@ -435,16 +475,267 @@ class TestInputFormat(PySparkTestCase):
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text"))
- def test_converter(self):
+ def test_converters(self):
+ # use of custom converters
basepath = self.tempdir.name
maps = sorted(self.sc.sequenceFile(
basepath + "/sftestdata/sfmap/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable",
- valueConverter="org.apache.spark.api.python.TestConverter").collect())
- em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])]
+ keyConverter="org.apache.spark.api.python.TestInputKeyConverter",
+ valueConverter="org.apache.spark.api.python.TestInputValueConverter").collect())
+ em = [(u'\x01', []),
+ (u'\x01', [3.0]),
+ (u'\x02', [1.0]),
+ (u'\x02', [1.0]),
+ (u'\x03', [2.0])]
+ self.assertEqual(maps, em)
+
+class TestOutputFormat(PySparkTestCase):
+
+ def setUp(self):
+ PySparkTestCase.setUp(self)
+ self.tempdir = tempfile.NamedTemporaryFile(delete=False)
+ os.unlink(self.tempdir.name)
+
+ def tearDown(self):
+ PySparkTestCase.tearDown(self)
+ shutil.rmtree(self.tempdir.name, ignore_errors=True)
+
+ def test_sequencefiles(self):
+ basepath = self.tempdir.name
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/")
+ ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect())
+ self.assertEqual(ints, ei)
+
+ ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
+ self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/")
+ doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect())
+ self.assertEqual(doubles, ed)
+
+ ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))]
+ self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/")
+ bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect())
+ self.assertEqual(bytes, ebs)
+
+ et = [(u'1', u'aa'),
+ (u'2', u'bb'),
+ (u'3', u'cc')]
+ self.sc.parallelize(et).saveAsSequenceFile(basepath + "/sftext/")
+ text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect())
+ self.assertEqual(text, et)
+
+ eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
+ self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/")
+ bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect())
+ self.assertEqual(bools, eb)
+
+ en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
+ self.sc.parallelize(en).saveAsSequenceFile(basepath + "/sfnull/")
+ nulls = sorted(self.sc.sequenceFile(basepath + "/sfnull/").collect())
+ self.assertEqual(nulls, en)
+
+ em = [(1, {}),
+ (1, {3.0: u'bb'}),
+ (2, {1.0: u'aa'}),
+ (2, {1.0: u'cc'}),
+ (3, {2.0: u'dd'})]
+ self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/")
+ maps = sorted(self.sc.sequenceFile(basepath + "/sfmap/").collect())
self.assertEqual(maps, em)
+ def test_oldhadoop(self):
+ basepath = self.tempdir.name
+ dict_data = [(1, {}),
+ (1, {"row1" : 1.0}),
+ (2, {"row2" : 2.0})]
+ self.sc.parallelize(dict_data).saveAsHadoopFile(
+ basepath + "/oldhadoop/",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable")
+ result = sorted(self.sc.hadoopFile(
+ basepath + "/oldhadoop/",
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable").collect())
+ self.assertEqual(result, dict_data)
+
+ conf = {
+ "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
+ "mapred.output.dir" : basepath + "/olddataset/"}
+ self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
+ input_conf = {"mapred.input.dir" : basepath + "/olddataset/"}
+ old_dataset = sorted(self.sc.hadoopRDD(
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable",
+ conf=input_conf).collect())
+ self.assertEqual(old_dataset, dict_data)
+
+ def test_newhadoop(self):
+ basepath = self.tempdir.name
+ # use custom ArrayWritable types and converters to handle arrays
+ array_data = [(1, array('d')),
+ (1, array('d', [1.0, 2.0, 3.0])),
+ (2, array('d', [3.0, 4.0, 5.0]))]
+ self.sc.parallelize(array_data).saveAsNewAPIHadoopFile(
+ basepath + "/newhadoop/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+ result = sorted(self.sc.newAPIHadoopFile(
+ basepath + "/newhadoop/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+ self.assertEqual(result, array_data)
+
+ conf = {"mapreduce.outputformat.class" :
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable",
+ "mapred.output.dir" : basepath + "/newdataset/"}
+ self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
+ valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+ input_conf = {"mapred.input.dir" : basepath + "/newdataset/"}
+ new_dataset = sorted(self.sc.newAPIHadoopRDD(
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter",
+ conf=input_conf).collect())
+ self.assertEqual(new_dataset, array_data)
+
+ def test_newolderror(self):
+ basepath = self.tempdir.name
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+ basepath + "/newolderror/saveAsHadoopFile/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
+ self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+ basepath + "/newolderror/saveAsNewAPIHadoopFile/",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat"))
+
+ def test_bad_inputs(self):
+ basepath = self.tempdir.name
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+ basepath + "/badinputs/saveAsHadoopFile/",
+ "org.apache.hadoop.mapred.NotValidOutputFormat"))
+ self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+ basepath + "/badinputs/saveAsNewAPIHadoopFile/",
+ "org.apache.hadoop.mapreduce.lib.output.NotValidOutputFormat"))
+
+ def test_converters(self):
+ # use of custom converters
+ basepath = self.tempdir.name
+ data = [(1, {3.0: u'bb'}),
+ (2, {1.0: u'aa'}),
+ (3, {2.0: u'dd'})]
+ self.sc.parallelize(data).saveAsNewAPIHadoopFile(
+ basepath + "/converters/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ keyConverter="org.apache.spark.api.python.TestOutputKeyConverter",
+ valueConverter="org.apache.spark.api.python.TestOutputValueConverter")
+ converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect())
+ expected = [(u'1', 3.0),
+ (u'2', 1.0),
+ (u'3', 2.0)]
+ self.assertEqual(converted, expected)
+
+ def test_reserialization(self):
+ basepath = self.tempdir.name
+ x = range(1, 5)
+ y = range(1001, 1005)
+ data = zip(x, y)
+ rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y))
+ rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
+ result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
+ self.assertEqual(result1, data)
+
+ rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+ result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
+ self.assertEqual(result2, data)
+
+ rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+ result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
+ self.assertEqual(result3, data)
+
+ conf4 = {
+ "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir" : basepath + "/reserialize/dataset"}
+ rdd.saveAsHadoopDataset(conf4)
+ result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
+ self.assertEqual(result4, data)
+
+ conf5 = {"mapreduce.outputformat.class" :
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir" : basepath + "/reserialize/newdataset"}
+ rdd.saveAsNewAPIHadoopDataset(conf5)
+ result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
+ self.assertEqual(result5, data)
+
+ def test_unbatched_save_and_read(self):
+ basepath = self.tempdir.name
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
+ basepath + "/unbatched/")
+
+ unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_sequence, ei)
+
+ unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/",
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_hadoopFile, ei)
+
+ unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_newAPIHadoopFile, ei)
+
+ oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ conf=oldconf,
+ batchSize=1).collect())
+ self.assertEqual(unbatched_hadoopRDD, ei)
+
+ newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ conf=newconf,
+ batchSize=1).collect())
+ self.assertEqual(unbatched_newAPIHadoopRDD, ei)
+
+ def test_malformed_RDD(self):
+ basepath = self.tempdir.name
+ # non-batch-serialized RDD[[(K, V)]] should be rejected
+ data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
+ rdd = self.sc.parallelize(data, numSlices=len(data))
+ self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
+ basepath + "/malformed/sequence"))
class TestDaemon(unittest.TestCase):
def connect(self, port):