diff options
author | Eric Liang <ekl@databricks.com> | 2016-09-14 13:37:35 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-09-14 13:37:35 -0700 |
commit | dbfc7aa4d0d5457bc92e1e66d065c6088d476843 (patch) | |
tree | c9b7a22d8d695fc155024a0830d95b2e6b8f6b6b /python/pyspark | |
parent | e33bfaed3b160fbc617c878067af17477a0044f5 (diff) | |
download | spark-dbfc7aa4d0d5457bc92e1e66d065c6088d476843.tar.gz spark-dbfc7aa4d0d5457bc92e1e66d065c6088d476843.tar.bz2 spark-dbfc7aa4d0d5457bc92e1e66d065c6088d476843.zip |
[SPARK-17472] [PYSPARK] Better error message for serialization failures of large objects in Python
## What changes were proposed in this pull request?
For large objects, pickle does not raise useful error messages. However, we can wrap them to be slightly more user friendly:
Example 1:
```
def run():
import numpy.random as nr
b = nr.bytes(8 * 1000000000)
sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
run()
```
Before:
```
error: 'i' format requires -2147483648 <= number <= 2147483647
```
After:
```
pickle.PicklingError: Object too large to serialize: 'i' format requires -2147483648 <= number <= 2147483647
```
Example 2:
```
def run():
import numpy.random as nr
b = sc.broadcast(nr.bytes(8 * 1000000000))
sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
run()
```
Before:
```
SystemError: error return without exception set
```
After:
```
cPickle.PicklingError: Could not serialize broadcast: SystemError: error return without exception set
```
## How was this patch tested?
Manually tried out these cases
cc davies
Author: Eric Liang <ekl@databricks.com>
Closes #15026 from ericl/spark-17472.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/broadcast.py | 11 | ||||
-rw-r--r-- | python/pyspark/cloudpickle.py | 10 |
2 files changed, 20 insertions, 1 deletions
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index a0b819220e..74dee14207 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -20,6 +20,8 @@ import sys import gc from tempfile import NamedTemporaryFile +from pyspark.cloudpickle import print_exec + if sys.version < '3': import cPickle as pickle else: @@ -75,7 +77,14 @@ class Broadcast(object): self._path = path def dump(self, value, f): - pickle.dump(value, f, 2) + try: + pickle.dump(value, f, 2) + except pickle.PickleError: + raise + except Exception as e: + msg = "Could not serialize broadcast: " + e.__class__.__name__ + ": " + e.message + print_exec(sys.stderr) + raise pickle.PicklingError(msg) f.close() return f.name diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 822ae46e45..da2b2f3757 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -109,6 +109,16 @@ class CloudPickler(Pickler): if 'recursion' in e.args[0]: msg = """Could not pickle object as excessively deep recursion required.""" raise pickle.PicklingError(msg) + except pickle.PickleError: + raise + except Exception as e: + if "'i' format requires" in e.message: + msg = "Object too large to serialize: " + e.message + else: + msg = "Could not serialize object: " + e.__class__.__name__ + ": " + e.message + print_exec(sys.stderr) + raise pickle.PicklingError(msg) + def save_memoryview(self, obj): """Fallback to save_string""" |