aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2016-08-24 23:36:04 -0700
committerReynold Xin <rxin@databricks.com>2016-08-24 23:36:04 -0700
commit5f02d2e5b4d37f554629cbd0e488e856fffd7b6b (patch)
tree00d1c3d44c017f3960888b37c15b08ec2c6d603e
parent4d0706d616176dc29ff3562e40cb00dd4eb9c302 (diff)
downloadspark-5f02d2e5b4d37f554629cbd0e488e856fffd7b6b.tar.gz
spark-5f02d2e5b4d37f554629cbd0e488e856fffd7b6b.tar.bz2
spark-5f02d2e5b4d37f554629cbd0e488e856fffd7b6b.zip
[SPARK-17215][SQL] Method `SQLContext.parseDataType(dataTypeString: String)` could be removed.
## What changes were proposed in this pull request? Method `SQLContext.parseDataType(dataTypeString: String)` could be removed, we should use `SparkSession.parseDataType(dataTypeString: String)` instead. This require updating PySpark. ## How was this patch tested? Existing test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #14790 from jiangxb1987/parseDataType.
-rw-r--r--python/pyspark/sql/column.py7
-rw-r--r--python/pyspark/sql/functions.py6
-rw-r--r--python/pyspark/sql/readwriter.py4
-rw-r--r--python/pyspark/sql/streaming.py4
-rw-r--r--python/pyspark/sql/tests.py2
-rw-r--r--python/pyspark/sql/types.py6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala10
7 files changed, 16 insertions, 23 deletions
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 4b99f3058b..8d5adc8ffd 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -328,10 +328,9 @@ class Column(object):
if isinstance(dataType, basestring):
jc = self._jc.cast(dataType)
elif isinstance(dataType, DataType):
- from pyspark.sql import SQLContext
- sc = SparkContext.getOrCreate()
- ctx = SQLContext.getOrCreate(sc)
- jdt = ctx._ssql_ctx.parseDataType(dataType.json())
+ from pyspark.sql import SparkSession
+ spark = SparkSession.builder.getOrCreate()
+ jdt = spark._jsparkSession.parseDataType(dataType.json())
jc = self._jc.cast(jdt)
else:
raise TypeError("unexpected type: %s" % type(dataType))
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 4ea83e24bb..89b3c07c07 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1760,11 +1760,11 @@ class UserDefinedFunction(object):
self._judf = self._create_judf(name)
def _create_judf(self, name):
- from pyspark.sql import SQLContext
+ from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
wrapped_func = _wrap_function(sc, self.func, self.returnType)
- ctx = SQLContext.getOrCreate(sc)
- jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
+ spark = SparkSession.builder.getOrCreate()
+ jdt = spark._jsparkSession.parseDataType(self.returnType.json())
if name is None:
f = self.func
name = f.__name__ if hasattr(f, '__name__') else f.__class__.__name__
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 3da6f497e9..3d79e0cccc 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -98,9 +98,11 @@ class DataFrameReader(OptionUtils):
:param schema: a :class:`pyspark.sql.types.StructType` object
"""
+ from pyspark.sql import SparkSession
if not isinstance(schema, StructType):
raise TypeError("schema should be StructType")
- jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+ spark = SparkSession.builder.getOrCreate()
+ jschema = spark._jsparkSession.parseDataType(schema.json())
self._jreader = self._jreader.schema(jschema)
return self
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 3761d2b199..a0ba5825f3 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -273,9 +273,11 @@ class DataStreamReader(OptionUtils):
>>> s = spark.readStream.schema(sdf_schema)
"""
+ from pyspark.sql import SparkSession
if not isinstance(schema, StructType):
raise TypeError("schema should be StructType")
- jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+ spark = SparkSession.builder.getOrCreate()
+ jschema = spark._jsparkSession.parseDataType(schema.json())
self._jreader = self._jreader.schema(jschema)
return self
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index fc41701b59..fd8e9cec3e 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -574,7 +574,7 @@ class SQLTests(ReusedPySparkTestCase):
def check_datatype(datatype):
pickled = pickle.loads(pickle.dumps(datatype))
assert datatype == pickled
- scala_datatype = self.spark._wrapped._ssql_ctx.parseDataType(datatype.json())
+ scala_datatype = self.spark._jsparkSession.parseDataType(datatype.json())
python_datatype = _parse_datatype_json_string(scala_datatype.json())
assert datatype == python_datatype
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 11b1e60ee7..4a023123b6 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -851,7 +851,7 @@ def _parse_datatype_json_string(json_string):
>>> def check_datatype(datatype):
... pickled = pickle.loads(pickle.dumps(datatype))
... assert datatype == pickled
- ... scala_datatype = sqlContext._ssql_ctx.parseDataType(datatype.json())
+ ... scala_datatype = spark._jsparkSession.parseDataType(datatype.json())
... python_datatype = _parse_datatype_json_string(scala_datatype.json())
... assert datatype == python_datatype
>>> for cls in _all_atomic_types.values():
@@ -1551,11 +1551,11 @@ register_input_converter(DateConverter())
def _test():
import doctest
from pyspark.context import SparkContext
- from pyspark.sql import SQLContext
+ from pyspark.sql import SparkSession
globs = globals()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
- globs['sqlContext'] = SQLContext(sc)
+ globs['spark'] = SparkSession.builder.getOrCreate()
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e7627ac2c9..fbf22197a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -743,16 +743,6 @@ class SQLContext private[sql](val sparkSession: SparkSession)
sparkSession.catalog.listTables(databaseName).collect().map(_.name)
}
- /**
- * Parses the data type in our internal string representation. The data type string should
- * have the same format as the one generated by `toString` in scala.
- * It is only used by PySpark.
- */
- // TODO: Remove this function (would require updating PySpark).
- private[sql] def parseDataType(dataTypeString: String): DataType = {
- DataType.fromJson(dataTypeString)
- }
-
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// Deprecated methods