diff options
-rw-r--r-- | python/pyspark/sql/context.py | 5 | ||||
-rw-r--r-- | python/pyspark/sql/dataframe.py | 16 | ||||
-rw-r--r-- | python/pyspark/sql/tests.py | 28 | ||||
-rw-r--r-- | python/pyspark/sql/types.py | 419 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala | 12 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 5 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala | 122 |
7 files changed, 292 insertions, 315 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 309c11faf9..c93a15bada 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -30,7 +30,7 @@ from pyspark.rdd import RDD, _prepare_for_python_RDD, ignore_unicode_prefix from pyspark.serializers import AutoBatchedSerializer, PickleSerializer from pyspark.sql import since from pyspark.sql.types import Row, StringType, StructType, _verify_type, \ - _infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter + _infer_schema, _has_nulltype, _merge_type, _create_converter from pyspark.sql.dataframe import DataFrame from pyspark.sql.readwriter import DataFrameReader from pyspark.sql.utils import install_exception_handler @@ -388,8 +388,7 @@ class SQLContext(object): raise TypeError("schema should be StructType or list or None") # convert python objects to sql data - converter = _python_to_sql_converter(schema) - rdd = rdd.map(converter) + rdd = rdd.map(schema.toInternal) jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) df = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 1e9c657cf8..83e02b85f0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -31,7 +31,7 @@ from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deseria from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync from pyspark.sql import since -from pyspark.sql.types import _create_cls, _parse_datatype_json_string +from pyspark.sql.types import _parse_datatype_json_string from pyspark.sql.column import Column, _to_seq, _to_java_column from pyspark.sql.readwriter import DataFrameWriter from pyspark.sql.types import * @@ -83,15 +83,7 @@ class DataFrame(object): """ if self._lazy_rdd is None: jrdd = self._jdf.javaToPython() - rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) - schema = self.schema - - def applySchema(it): - cls = _create_cls(schema) - return map(cls, it) - - self._lazy_rdd = rdd.mapPartitions(applySchema) - + self._lazy_rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) return self._lazy_rdd @property @@ -287,9 +279,7 @@ class DataFrame(object): """ with SCCallSiteSync(self._sc) as css: port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) - rs = list(_load_from_socket(port, BatchedSerializer(PickleSerializer()))) - cls = _create_cls(self.schema) - return [cls(r) for r in rs] + return list(_load_from_socket(port, BatchedSerializer(PickleSerializer()))) @ignore_unicode_prefix @since(1.3) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 66827d4885..4d7cad5a1a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -151,6 +151,17 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(self.sqlCtx.range(-2).count(), 0) self.assertEqual(self.sqlCtx.range(3).count(), 3) + def test_duplicated_column_names(self): + df = self.sqlCtx.createDataFrame([(1, 2)], ["c", "c"]) + row = df.select('*').first() + self.assertEqual(1, row[0]) + self.assertEqual(2, row[1]) + self.assertEqual("Row(c=1, c=2)", str(row)) + # Cannot access columns + self.assertRaises(AnalysisException, lambda: df.select(df[0]).first()) + self.assertRaises(AnalysisException, lambda: df.select(df.c).first()) + self.assertRaises(AnalysisException, lambda: df.select(df["c"]).first()) + def test_explode(self): from pyspark.sql.functions import explode d = [Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"})] @@ -401,6 +412,14 @@ class SQLTests(ReusedPySparkTestCase): point = df.head().point self.assertEquals(point, ExamplePoint(1.0, 2.0)) + def test_udf_with_udt(self): + from pyspark.sql.tests import ExamplePoint + row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) + df = self.sc.parallelize([row]).toDF() + self.assertEqual(1.0, df.map(lambda r: r.point.x).first()) + udf = UserDefinedFunction(lambda p: p.y, DoubleType()) + self.assertEqual(2.0, df.select(udf(df.point)).first()[0]) + def test_parquet_with_udt(self): from pyspark.sql.tests import ExamplePoint row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) @@ -693,12 +712,9 @@ class SQLTests(ReusedPySparkTestCase): utcnow = datetime.datetime.fromtimestamp(ts, utc) df = self.sqlCtx.createDataFrame([(day, now, utcnow)]) day1, now1, utcnow1 = df.first() - # Pyrolite serialize java.sql.Date as datetime, will be fixed in new version - self.assertEqual(day1.date(), day) - # Pyrolite does not support microsecond, the error should be - # less than 1 millisecond - self.assertTrue(now - now1 < datetime.timedelta(0.001)) - self.assertTrue(now - utcnow1 < datetime.timedelta(0.001)) + self.assertEqual(day1, day) + self.assertEqual(now, now1) + self.assertEqual(now, utcnow1) def test_decimal(self): from decimal import Decimal diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index fecfe6d71e..d638576916 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -20,13 +20,9 @@ import decimal import time import datetime import calendar -import keyword -import warnings import json import re -import weakref from array import array -from operator import itemgetter if sys.version >= "3": long = int @@ -71,6 +67,26 @@ class DataType(object): separators=(',', ':'), sort_keys=True) + def needConversion(self): + """ + Does this type need to conversion between Python object and internal SQL object. + + This is used to avoid the unnecessary conversion for ArrayType/MapType/StructType. + """ + return False + + def toInternal(self, obj): + """ + Converts a Python object into an internal SQL object. + """ + return obj + + def fromInternal(self, obj): + """ + Converts an internal SQL object into a native Python object. + """ + return obj + # This singleton pattern does not work with pickle, you will get # another object after pickle and unpickle @@ -143,6 +159,17 @@ class DateType(AtomicType): __metaclass__ = DataTypeSingleton + EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal() + + def needConversion(self): + return True + + def toInternal(self, d): + return d and d.toordinal() - self.EPOCH_ORDINAL + + def fromInternal(self, v): + return v and datetime.date.fromordinal(v + self.EPOCH_ORDINAL) + class TimestampType(AtomicType): """Timestamp (datetime.datetime) data type. @@ -150,6 +177,19 @@ class TimestampType(AtomicType): __metaclass__ = DataTypeSingleton + def needConversion(self): + return True + + def toInternal(self, dt): + if dt is not None: + seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) + return int(seconds * 1e6 + dt.microsecond) + + def fromInternal(self, ts): + if ts is not None: + return datetime.datetime.fromtimestamp(ts / 1e6) + class DecimalType(FractionalType): """Decimal (decimal.Decimal) data type. @@ -259,6 +299,19 @@ class ArrayType(DataType): return ArrayType(_parse_datatype_json_value(json["elementType"]), json["containsNull"]) + def needConversion(self): + return self.elementType.needConversion() + + def toInternal(self, obj): + if not self.needConversion(): + return obj + return obj and [self.elementType.toInternal(v) for v in obj] + + def fromInternal(self, obj): + if not self.needConversion(): + return obj + return obj and [self.elementType.fromInternal(v) for v in obj] + class MapType(DataType): """Map data type. @@ -304,6 +357,21 @@ class MapType(DataType): _parse_datatype_json_value(json["valueType"]), json["valueContainsNull"]) + def needConversion(self): + return self.keyType.needConversion() or self.valueType.needConversion() + + def toInternal(self, obj): + if not self.needConversion(): + return obj + return obj and dict((self.keyType.toInternal(k), self.valueType.toInternal(v)) + for k, v in obj.items()) + + def fromInternal(self, obj): + if not self.needConversion(): + return obj + return obj and dict((self.keyType.fromInternal(k), self.valueType.fromInternal(v)) + for k, v in obj.items()) + class StructField(DataType): """A field in :class:`StructType`. @@ -311,7 +379,7 @@ class StructField(DataType): :param name: string, name of the field. :param dataType: :class:`DataType` of the field. :param nullable: boolean, whether the field can be null (None) or not. - :param metadata: a dict from string to simple type that can be serialized to JSON automatically + :param metadata: a dict from string to simple type that can be toInternald to JSON automatically """ def __init__(self, name, dataType, nullable=True, metadata=None): @@ -351,6 +419,15 @@ class StructField(DataType): json["nullable"], json["metadata"]) + def needConversion(self): + return self.dataType.needConversion() + + def toInternal(self, obj): + return self.dataType.toInternal(obj) + + def fromInternal(self, obj): + return self.dataType.fromInternal(obj) + class StructType(DataType): """Struct type, consisting of a list of :class:`StructField`. @@ -371,10 +448,13 @@ class StructType(DataType): """ if not fields: self.fields = [] + self.names = [] else: self.fields = fields + self.names = [f.name for f in fields] assert all(isinstance(f, StructField) for f in fields),\ "fields should be a list of StructField" + self._needSerializeFields = None def add(self, field, data_type=None, nullable=True, metadata=None): """ @@ -406,6 +486,7 @@ class StructType(DataType): """ if isinstance(field, StructField): self.fields.append(field) + self.names.append(field.name) else: if isinstance(field, str) and data_type is None: raise ValueError("Must specify DataType if passing name of struct_field to create.") @@ -415,6 +496,7 @@ class StructType(DataType): else: data_type_f = data_type self.fields.append(StructField(field, data_type_f, nullable, metadata)) + self.names.append(field) return self def simpleString(self): @@ -432,6 +514,41 @@ class StructType(DataType): def fromJson(cls, json): return StructType([StructField.fromJson(f) for f in json["fields"]]) + def needConversion(self): + # We need convert Row()/namedtuple into tuple() + return True + + def toInternal(self, obj): + if obj is None: + return + + if self._needSerializeFields is None: + self._needSerializeFields = any(f.needConversion() for f in self.fields) + + if self._needSerializeFields: + if isinstance(obj, dict): + return tuple(f.toInternal(obj.get(n)) for n, f in zip(names, self.fields)) + elif isinstance(obj, (tuple, list)): + return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) + else: + raise ValueError("Unexpected tuple %r with StructType" % obj) + else: + if isinstance(obj, dict): + return tuple(obj.get(n) for n in self.names) + elif isinstance(obj, (list, tuple)): + return tuple(obj) + else: + raise ValueError("Unexpected tuple %r with StructType" % obj) + + def fromInternal(self, obj): + if obj is None: + return + if isinstance(obj, Row): + # it's already converted by pickler + return obj + values = [f.dataType.fromInternal(v) for f, v in zip(self.fields, obj)] + return _create_row(self.names, values) + class UserDefinedType(DataType): """User-defined type (UDT). @@ -464,17 +581,35 @@ class UserDefinedType(DataType): """ raise NotImplementedError("UDT must have a paired Scala UDT.") + def needConversion(self): + return True + + @classmethod + def _cachedSqlType(cls): + """ + Cache the sqlType() into class, because it's heavy used in `toInternal`. + """ + if not hasattr(cls, "_cached_sql_type"): + cls._cached_sql_type = cls.sqlType() + return cls._cached_sql_type + + def toInternal(self, obj): + return self._cachedSqlType().toInternal(self.serialize(obj)) + + def fromInternal(self, obj): + return self.deserialize(self._cachedSqlType().fromInternal(obj)) + def serialize(self, obj): """ Converts the a user-type object into a SQL datum. """ - raise NotImplementedError("UDT must implement serialize().") + raise NotImplementedError("UDT must implement toInternal().") def deserialize(self, datum): """ Converts a SQL datum into a user-type object. """ - raise NotImplementedError("UDT must implement deserialize().") + raise NotImplementedError("UDT must implement fromInternal().") def simpleString(self): return 'udt' @@ -671,117 +806,6 @@ def _infer_schema(row): return StructType(fields) -def _need_python_to_sql_conversion(dataType): - """ - Checks whether we need python to sql conversion for the given type. - For now, only UDTs need this conversion. - - >>> _need_python_to_sql_conversion(DoubleType()) - False - >>> schema0 = StructType([StructField("indices", ArrayType(IntegerType(), False), False), - ... StructField("values", ArrayType(DoubleType(), False), False)]) - >>> _need_python_to_sql_conversion(schema0) - True - >>> _need_python_to_sql_conversion(ExamplePointUDT()) - True - >>> schema1 = ArrayType(ExamplePointUDT(), False) - >>> _need_python_to_sql_conversion(schema1) - True - >>> schema2 = StructType([StructField("label", DoubleType(), False), - ... StructField("point", ExamplePointUDT(), False)]) - >>> _need_python_to_sql_conversion(schema2) - True - """ - if isinstance(dataType, StructType): - # convert namedtuple or Row into tuple - return True - elif isinstance(dataType, ArrayType): - return _need_python_to_sql_conversion(dataType.elementType) - elif isinstance(dataType, MapType): - return _need_python_to_sql_conversion(dataType.keyType) or \ - _need_python_to_sql_conversion(dataType.valueType) - elif isinstance(dataType, UserDefinedType): - return True - elif isinstance(dataType, (DateType, TimestampType)): - return True - else: - return False - - -EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal() - - -def _python_to_sql_converter(dataType): - """ - Returns a converter that converts a Python object into a SQL datum for the given type. - - >>> conv = _python_to_sql_converter(DoubleType()) - >>> conv(1.0) - 1.0 - >>> conv = _python_to_sql_converter(ArrayType(DoubleType(), False)) - >>> conv([1.0, 2.0]) - [1.0, 2.0] - >>> conv = _python_to_sql_converter(ExamplePointUDT()) - >>> conv(ExamplePoint(1.0, 2.0)) - [1.0, 2.0] - >>> schema = StructType([StructField("label", DoubleType(), False), - ... StructField("point", ExamplePointUDT(), False)]) - >>> conv = _python_to_sql_converter(schema) - >>> conv((1.0, ExamplePoint(1.0, 2.0))) - (1.0, [1.0, 2.0]) - """ - if not _need_python_to_sql_conversion(dataType): - return lambda x: x - - if isinstance(dataType, StructType): - names, types = zip(*[(f.name, f.dataType) for f in dataType.fields]) - if any(_need_python_to_sql_conversion(t) for t in types): - converters = [_python_to_sql_converter(t) for t in types] - - def converter(obj): - if isinstance(obj, dict): - return tuple(c(obj.get(n)) for n, c in zip(names, converters)) - elif isinstance(obj, tuple): - if hasattr(obj, "__fields__") or hasattr(obj, "_fields"): - return tuple(c(v) for c, v in zip(converters, obj)) - else: - return tuple(c(v) for c, v in zip(converters, obj)) - elif obj is not None: - raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType)) - else: - def converter(obj): - if isinstance(obj, dict): - return tuple(obj.get(n) for n in names) - else: - return tuple(obj) - return converter - elif isinstance(dataType, ArrayType): - element_converter = _python_to_sql_converter(dataType.elementType) - return lambda a: a and [element_converter(v) for v in a] - elif isinstance(dataType, MapType): - key_converter = _python_to_sql_converter(dataType.keyType) - value_converter = _python_to_sql_converter(dataType.valueType) - return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) - - elif isinstance(dataType, UserDefinedType): - return lambda obj: obj and dataType.serialize(obj) - - elif isinstance(dataType, DateType): - return lambda d: d and d.toordinal() - EPOCH_ORDINAL - - elif isinstance(dataType, TimestampType): - - def to_posix_timstamp(dt): - if dt: - seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo - else time.mktime(dt.timetuple())) - return int(seconds * 1e6 + dt.microsecond) - return to_posix_timstamp - - else: - raise ValueError("Unexpected type %r" % dataType) - - def _has_nulltype(dt): """ Return whether there is NullType in `dt` or not """ if isinstance(dt, StructType): @@ -1076,7 +1100,7 @@ def _verify_type(obj, dataType): if isinstance(dataType, UserDefinedType): if not (hasattr(obj, '__UDT__') and obj.__UDT__ == dataType): raise ValueError("%r is not an instance of type %r" % (obj, dataType)) - _verify_type(dataType.serialize(obj), dataType.sqlType()) + _verify_type(dataType.toInternal(obj), dataType.sqlType()) return _type = type(dataType) @@ -1086,7 +1110,7 @@ def _verify_type(obj, dataType): if not isinstance(obj, (tuple, list)): raise TypeError("StructType can not accept object in type %s" % type(obj)) else: - # subclass of them can not be deserialized in JVM + # subclass of them can not be fromInternald in JVM if type(obj) not in _acceptable_types[_type]: raise TypeError("%s can not accept object in type %s" % (dataType, type(obj))) @@ -1106,159 +1130,10 @@ def _verify_type(obj, dataType): for v, f in zip(obj, dataType.fields): _verify_type(v, f.dataType) -_cached_cls = weakref.WeakValueDictionary() - - -def _restore_object(dataType, obj): - """ Restore object during unpickling. """ - # use id(dataType) as key to speed up lookup in dict - # Because of batched pickling, dataType will be the - # same object in most cases. - k = id(dataType) - cls = _cached_cls.get(k) - if cls is None or cls.__datatype is not dataType: - # use dataType as key to avoid create multiple class - cls = _cached_cls.get(dataType) - if cls is None: - cls = _create_cls(dataType) - _cached_cls[dataType] = cls - cls.__datatype = dataType - _cached_cls[k] = cls - return cls(obj) - - -def _create_object(cls, v): - """ Create an customized object with class `cls`. """ - # datetime.date would be deserialized as datetime.datetime - # from java type, so we need to set it back. - if cls is datetime.date and isinstance(v, datetime.datetime): - return v.date() - return cls(v) if v is not None else v - - -def _create_getter(dt, i): - """ Create a getter for item `i` with schema """ - cls = _create_cls(dt) - - def getter(self): - return _create_object(cls, self[i]) - - return getter - - -def _has_struct_or_date(dt): - """Return whether `dt` is or has StructType/DateType in it""" - if isinstance(dt, StructType): - return True - elif isinstance(dt, ArrayType): - return _has_struct_or_date(dt.elementType) - elif isinstance(dt, MapType): - return _has_struct_or_date(dt.keyType) or _has_struct_or_date(dt.valueType) - elif isinstance(dt, DateType): - return True - elif isinstance(dt, UserDefinedType): - return True - return False - - -def _create_properties(fields): - """Create properties according to fields""" - ps = {} - for i, f in enumerate(fields): - name = f.name - if (name.startswith("__") and name.endswith("__") - or keyword.iskeyword(name)): - warnings.warn("field name %s can not be accessed in Python," - "use position to access it instead" % name) - if _has_struct_or_date(f.dataType): - # delay creating object until accessing it - getter = _create_getter(f.dataType, i) - else: - getter = itemgetter(i) - ps[name] = property(getter) - return ps - - -def _create_cls(dataType): - """ - Create an class by dataType - - The created class is similar to namedtuple, but can have nested schema. - - >>> schema = _parse_schema_abstract("a b c") - >>> row = (1, 1.0, "str") - >>> schema = _infer_schema_type(row, schema) - >>> obj = _create_cls(schema)(row) - >>> import pickle - >>> pickle.loads(pickle.dumps(obj)) - Row(a=1, b=1.0, c='str') - - >>> row = [[1], {"key": (1, 2.0)}] - >>> schema = _parse_schema_abstract("a[] b{c d}") - >>> schema = _infer_schema_type(row, schema) - >>> obj = _create_cls(schema)(row) - >>> pickle.loads(pickle.dumps(obj)) - Row(a=[1], b={'key': Row(c=1, d=2.0)}) - >>> pickle.loads(pickle.dumps(obj.a)) - [1] - >>> pickle.loads(pickle.dumps(obj.b)) - {'key': Row(c=1, d=2.0)} - """ - - if isinstance(dataType, ArrayType): - cls = _create_cls(dataType.elementType) - - def List(l): - if l is None: - return - return [_create_object(cls, v) for v in l] - - return List - - elif isinstance(dataType, MapType): - kcls = _create_cls(dataType.keyType) - vcls = _create_cls(dataType.valueType) - - def Dict(d): - if d is None: - return - return dict((_create_object(kcls, k), _create_object(vcls, v)) for k, v in d.items()) - - return Dict - - elif isinstance(dataType, DateType): - return datetime.date - - elif isinstance(dataType, UserDefinedType): - return lambda datum: dataType.deserialize(datum) - - elif not isinstance(dataType, StructType): - # no wrapper for atomic types - return lambda x: x - - class Row(tuple): - - """ Row in DataFrame """ - __datatype = dataType - __fields__ = tuple(f.name for f in dataType.fields) - __slots__ = () - - # create property for fast access - locals().update(_create_properties(dataType.fields)) - - def asDict(self): - """ Return as a dict """ - return dict((n, getattr(self, n)) for n in self.__fields__) - - def __repr__(self): - # call collect __repr__ for nested objects - return ("Row(%s)" % ", ".join("%s=%r" % (n, getattr(self, n)) - for n in self.__fields__)) - - def __reduce__(self): - return (_restore_object, (self.__datatype, tuple(self))) - return Row +# This is used to unpickle a Row from JVM +def _create_row_inbound_converter(dataType): + return lambda *a: dataType.fromInternal(a) def _create_row(fields, values): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 8b472a529e..094904bbf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -132,6 +132,18 @@ class GenericInternalRow(protected[sql] val values: Array[Any]) override def copy(): InternalRow = this } +/** + * This is used for serialization of Python DataFrame + */ +class GenericInternalRowWithSchema(values: Array[Any], override val schema: StructType) + extends GenericInternalRow(values) { + + /** No-arg constructor for serialization. */ + protected def this() = this(null, null) + + override def fieldIndex(name: String): Int = schema.fieldIndex(name) +} + class GenericMutableRow(val values: Array[Any]) extends MutableRow with ArrayBackedRow { /** No-arg constructor for serialization. */ protected def this() = this(null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d9f987ae02..d7966651b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -30,7 +30,6 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD -import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ @@ -1550,8 +1549,8 @@ class DataFrame private[sql]( */ protected[sql] def javaToPython: JavaRDD[Array[Byte]] = { val structType = schema // capture it for closure - val jrdd = queryExecution.toRdd.map(EvaluatePython.toJava(_, structType)).toJavaRDD() - SerDeUtil.javaToPython(jrdd) + val rdd = queryExecution.toRdd.map(EvaluatePython.toJava(_, structType)) + EvaluatePython.javaToPython(rdd) } //////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala index 1c8130b07c..6d6e67dace 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala @@ -17,15 +17,16 @@ package org.apache.spark.sql.execution +import java.io.OutputStream import java.util.{List => JList, Map => JMap} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import net.razorvine.pickle.{Pickler, Unpickler} +import net.razorvine.pickle._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.python.{PythonBroadcast, PythonRDD} +import org.apache.spark.api.python.{PythonBroadcast, PythonRDD, SerDeUtil} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -33,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.{Accumulator, Logging => SparkLogging} @@ -130,8 +130,13 @@ object EvaluatePython { case (null, _) => null case (row: InternalRow, struct: StructType) => - val fields = struct.fields.map(field => field.dataType) - rowToArray(row, fields) + val values = new Array[Any](row.size) + var i = 0 + while (i < row.size) { + values(i) = toJava(row(i), struct.fields(i).dataType) + i += 1 + } + new GenericInternalRowWithSchema(values, struct) case (seq: Seq[Any], array: ArrayType) => seq.map(x => toJava(x, array.elementType)).asJava @@ -142,9 +147,6 @@ object EvaluatePython { case (ud, udt: UserDefinedType[_]) => toJava(ud, udt.sqlType) - case (date: Int, DateType) => DateTimeUtils.toJavaDate(date) - case (t: Long, TimestampType) => DateTimeUtils.toJavaTimestamp(t) - case (d: Decimal, _) => d.toJavaBigDecimal case (s: UTF8String, StringType) => s.toString @@ -153,14 +155,6 @@ object EvaluatePython { } /** - * Convert Row into Java Array (for pickled into Python) - */ - def rowToArray(row: InternalRow, fields: Seq[DataType]): Array[Any] = { - // TODO: this is slow! - row.toSeq.zip(fields).map {case (obj, dt) => toJava(obj, dt)}.toArray - } - - /** * Converts `obj` to the type specified by the data type, or returns null if the type of obj is * unexpected. Because Python doesn't enforce the type. */ @@ -220,6 +214,96 @@ object EvaluatePython { // TODO(davies): we could improve this by try to cast the object to expected type case (c, _) => null } + + + private val module = "pyspark.sql.types" + + /** + * Pickler for StructType + */ + private class StructTypePickler extends IObjectPickler { + + private val cls = classOf[StructType] + + def register(): Unit = { + Pickler.registerCustomPickler(cls, this) + } + + def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + out.write(Opcodes.GLOBAL) + out.write((module + "\n" + "_parse_datatype_json_string" + "\n").getBytes("utf-8")) + val schema = obj.asInstanceOf[StructType] + pickler.save(schema.json) + out.write(Opcodes.TUPLE1) + out.write(Opcodes.REDUCE) + } + } + + /** + * Pickler for InternalRow + */ + private class RowPickler extends IObjectPickler { + + private val cls = classOf[GenericInternalRowWithSchema] + + // register this to Pickler and Unpickler + def register(): Unit = { + Pickler.registerCustomPickler(this.getClass, this) + Pickler.registerCustomPickler(cls, this) + } + + def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + if (obj == this) { + out.write(Opcodes.GLOBAL) + out.write((module + "\n" + "_create_row_inbound_converter" + "\n").getBytes("utf-8")) + } else { + // it will be memorized by Pickler to save some bytes + pickler.save(this) + val row = obj.asInstanceOf[GenericInternalRowWithSchema] + // schema should always be same object for memoization + pickler.save(row.schema) + out.write(Opcodes.TUPLE1) + out.write(Opcodes.REDUCE) + + out.write(Opcodes.MARK) + var i = 0 + while (i < row.values.size) { + pickler.save(row.values(i)) + i += 1 + } + row.values.foreach(pickler.save) + out.write(Opcodes.TUPLE) + out.write(Opcodes.REDUCE) + } + } + } + + private[this] var registered = false + /** + * This should be called before trying to serialize any above classes un cluster mode, + * this should be put in the closure + */ + def registerPicklers(): Unit = { + synchronized { + if (!registered) { + SerDeUtil.initialize() + new StructTypePickler().register() + new RowPickler().register() + registered = true + } + } + } + + /** + * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by + * PySpark. + */ + def javaToPython(rdd: RDD[Any]): RDD[Array[Byte]] = { + rdd.mapPartitions { iter => + registerPicklers() // let it called in executor + new SerDeUtil.AutoBatchedPickler(iter) + } + } } /** @@ -254,12 +338,14 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: val childResults = child.execute().map(_.copy()) val parent = childResults.mapPartitions { iter => + EvaluatePython.registerPicklers() // register pickler for Row val pickle = new Pickler val currentRow = newMutableProjection(udf.children, child.output)() val fields = udf.children.map(_.dataType) - iter.grouped(1000).map { inputRows => + val schema = new StructType(fields.map(t => new StructField("", t, true)).toArray) + iter.grouped(100).map { inputRows => val toBePickled = inputRows.map { row => - EvaluatePython.rowToArray(currentRow(row), fields) + EvaluatePython.toJava(currentRow(row), schema) }.toArray pickle.dumps(toBePickled) } |