aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/dataframe.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r--python/pyspark/sql/dataframe.py67
1 files changed, 32 insertions, 35 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index e4a191a9ef..f2280b5100 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -29,9 +29,10 @@ from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
-from pyspark.sql.types import *
from pyspark.sql.types import _create_cls, _parse_datatype_json_string
from pyspark.sql.column import Column, _to_seq, _to_java_column
+from pyspark.sql.readwriter import DataFrameWriter
+from pyspark.sql.types import *
__all__ = ["DataFrame", "SchemaRDD", "DataFrameNaFunctions", "DataFrameStatFunctions"]
@@ -151,25 +152,6 @@ class DataFrame(object):
"""
self._jdf.insertInto(tableName, overwrite)
- def _java_save_mode(self, mode):
- """Returns the Java save mode based on the Python save mode represented by a string.
- """
- jSaveMode = self._sc._jvm.org.apache.spark.sql.SaveMode
- jmode = jSaveMode.ErrorIfExists
- mode = mode.lower()
- if mode == "append":
- jmode = jSaveMode.Append
- elif mode == "overwrite":
- jmode = jSaveMode.Overwrite
- elif mode == "ignore":
- jmode = jSaveMode.Ignore
- elif mode == "error":
- pass
- else:
- raise ValueError(
- "Only 'append', 'overwrite', 'ignore', and 'error' are acceptable save mode.")
- return jmode
-
def saveAsTable(self, tableName, source=None, mode="error", **options):
"""Saves the contents of this :class:`DataFrame` to a data source as a table.
@@ -185,11 +167,7 @@ class DataFrame(object):
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
"""
- if source is None:
- source = self.sql_ctx.getConf("spark.sql.sources.default",
- "org.apache.spark.sql.parquet")
- jmode = self._java_save_mode(mode)
- self._jdf.saveAsTable(tableName, source, jmode, options)
+ self.write.saveAsTable(tableName, source, mode, **options)
def save(self, path=None, source=None, mode="error", **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
@@ -206,13 +184,22 @@ class DataFrame(object):
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
"""
- if path is not None:
- options["path"] = path
- if source is None:
- source = self.sql_ctx.getConf("spark.sql.sources.default",
- "org.apache.spark.sql.parquet")
- jmode = self._java_save_mode(mode)
- self._jdf.save(source, jmode, options)
+ return self.write.save(path, source, mode, **options)
+
+ @property
+ def write(self):
+ """
+ Interface for saving the content of the :class:`DataFrame` out
+ into external storage.
+
+ :return :class:`DataFrameWriter`
+
+ ::note: Experimental
+
+ >>> df.write
+ <pyspark.sql.readwriter.DataFrameWriter object at ...>
+ """
+ return DataFrameWriter(self)
@property
def schema(self):
@@ -411,9 +398,19 @@ class DataFrame(object):
self._jdf.unpersist(blocking)
return self
- # def coalesce(self, numPartitions, shuffle=False):
- # rdd = self._jdf.coalesce(numPartitions, shuffle, None)
- # return DataFrame(rdd, self.sql_ctx)
+ def coalesce(self, numPartitions):
+ """
+ Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions.
+
+ Similar to coalesce defined on an :class:`RDD`, this operation results in a
+ narrow dependency, e.g. if you go from 1000 partitions to 100 partitions,
+ there will not be a shuffle, instead each of the 100 new partitions will
+ claim 10 of the current partitions.
+
+ >>> df.coalesce(1).rdd.getNumPartitions()
+ 1
+ """
+ return DataFrame(self._jdf.coalesce(numPartitions), self.sql_ctx)
def repartition(self, numPartitions):
"""Returns a new :class:`DataFrame` that has exactly ``numPartitions`` partitions.