aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/context.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-02-27 20:07:17 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-27 20:07:17 -0800
commite0e64ba4b1b8eb72e856286f756c65fa22ab0a36 (patch)
treeca358052d6b572756ecbbf98133a093db3f4cc83 /python/pyspark/sql/context.py
parent8c468a6600e0deb5464990df60148212e64fdecd (diff)
downloadspark-e0e64ba4b1b8eb72e856286f756c65fa22ab0a36.tar.gz
spark-e0e64ba4b1b8eb72e856286f756c65fa22ab0a36.tar.bz2
spark-e0e64ba4b1b8eb72e856286f756c65fa22ab0a36.zip
[SPARK-6055] [PySpark] fix incorrect __eq__ of DataType
The _eq_ of DataType is not correct, class cache is not use correctly (created class can not be find by dataType), then it will create lots of classes (saved in _cached_cls), never released. Also, all same DataType have same hash code, there will be many object in a dict with the same hash code, end with hash attach, it's very slow to access this dict (depends on the implementation of CPython). This PR also improve the performance of inferSchema (avoid the unnecessary converter of object). cc pwendell JoshRosen Author: Davies Liu <davies@databricks.com> Closes #4808 from davies/leak and squashes the following commits: 6a322a4 [Davies Liu] tests refactor 3da44fc [Davies Liu] fix __eq__ of Singleton 534ac90 [Davies Liu] add more checks 46999dc [Davies Liu] fix tests d9ae973 [Davies Liu] fix memory leak in sql
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r--python/pyspark/sql/context.py90
1 files changed, 1 insertions, 89 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 5d7aeb664c..795ef0dbc4 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -17,7 +17,6 @@
import warnings
import json
-from array import array
from itertools import imap
from py4j.protocol import Py4JError
@@ -25,7 +24,7 @@ from py4j.java_collections import MapConverter
from pyspark.rdd import RDD, _prepare_for_python_RDD
from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
-from pyspark.sql.types import StringType, StructType, _verify_type, \
+from pyspark.sql.types import Row, StringType, StructType, _verify_type, \
_infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter
from pyspark.sql.dataframe import DataFrame
@@ -620,93 +619,6 @@ class HiveContext(SQLContext):
return self._jvm.HiveContext(self._jsc.sc())
-def _create_row(fields, values):
- row = Row(*values)
- row.__FIELDS__ = fields
- return row
-
-
-class Row(tuple):
-
- """
- A row in L{DataFrame}. The fields in it can be accessed like attributes.
-
- Row can be used to create a row object by using named arguments,
- the fields will be sorted by names.
-
- >>> row = Row(name="Alice", age=11)
- >>> row
- Row(age=11, name='Alice')
- >>> row.name, row.age
- ('Alice', 11)
-
- Row also can be used to create another Row like class, then it
- could be used to create Row objects, such as
-
- >>> Person = Row("name", "age")
- >>> Person
- <Row(name, age)>
- >>> Person("Alice", 11)
- Row(name='Alice', age=11)
- """
-
- def __new__(self, *args, **kwargs):
- if args and kwargs:
- raise ValueError("Can not use both args "
- "and kwargs to create Row")
- if args:
- # create row class or objects
- return tuple.__new__(self, args)
-
- elif kwargs:
- # create row objects
- names = sorted(kwargs.keys())
- values = tuple(kwargs[n] for n in names)
- row = tuple.__new__(self, values)
- row.__FIELDS__ = names
- return row
-
- else:
- raise ValueError("No args or kwargs")
-
- def asDict(self):
- """
- Return as an dict
- """
- if not hasattr(self, "__FIELDS__"):
- raise TypeError("Cannot convert a Row class into dict")
- return dict(zip(self.__FIELDS__, self))
-
- # let obect acs like class
- def __call__(self, *args):
- """create new Row object"""
- return _create_row(self, args)
-
- def __getattr__(self, item):
- if item.startswith("__"):
- raise AttributeError(item)
- try:
- # it will be slow when it has many fields,
- # but this will not be used in normal cases
- idx = self.__FIELDS__.index(item)
- return self[idx]
- except IndexError:
- raise AttributeError(item)
-
- def __reduce__(self):
- if hasattr(self, "__FIELDS__"):
- return (_create_row, (self.__FIELDS__, tuple(self)))
- else:
- return tuple.__reduce__(self)
-
- def __repr__(self):
- if hasattr(self, "__FIELDS__"):
- return "Row(%s)" % ", ".join("%s=%r" % (k, v)
- for k, v in zip(self.__FIELDS__, self))
- else:
- return "<Row(%s)>" % ", ".join(self)
-
-
def _test():
import doctest
from pyspark.context import SparkContext