diff options
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r-- | python/pyspark/sql/dataframe.py | 67 |
1 files changed, 32 insertions, 35 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e4a191a9ef..f2280b5100 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -29,9 +29,10 @@ from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync -from pyspark.sql.types import * from pyspark.sql.types import _create_cls, _parse_datatype_json_string from pyspark.sql.column import Column, _to_seq, _to_java_column +from pyspark.sql.readwriter import DataFrameWriter +from pyspark.sql.types import * __all__ = ["DataFrame", "SchemaRDD", "DataFrameNaFunctions", "DataFrameStatFunctions"] @@ -151,25 +152,6 @@ class DataFrame(object): """ self._jdf.insertInto(tableName, overwrite) - def _java_save_mode(self, mode): - """Returns the Java save mode based on the Python save mode represented by a string. - """ - jSaveMode = self._sc._jvm.org.apache.spark.sql.SaveMode - jmode = jSaveMode.ErrorIfExists - mode = mode.lower() - if mode == "append": - jmode = jSaveMode.Append - elif mode == "overwrite": - jmode = jSaveMode.Overwrite - elif mode == "ignore": - jmode = jSaveMode.Ignore - elif mode == "error": - pass - else: - raise ValueError( - "Only 'append', 'overwrite', 'ignore', and 'error' are acceptable save mode.") - return jmode - def saveAsTable(self, tableName, source=None, mode="error", **options): """Saves the contents of this :class:`DataFrame` to a data source as a table. @@ -185,11 +167,7 @@ class DataFrame(object): * `error`: Throw an exception if data already exists. * `ignore`: Silently ignore this operation if data already exists. """ - if source is None: - source = self.sql_ctx.getConf("spark.sql.sources.default", - "org.apache.spark.sql.parquet") - jmode = self._java_save_mode(mode) - self._jdf.saveAsTable(tableName, source, jmode, options) + self.write.saveAsTable(tableName, source, mode, **options) def save(self, path=None, source=None, mode="error", **options): """Saves the contents of the :class:`DataFrame` to a data source. @@ -206,13 +184,22 @@ class DataFrame(object): * `error`: Throw an exception if data already exists. * `ignore`: Silently ignore this operation if data already exists. """ - if path is not None: - options["path"] = path - if source is None: - source = self.sql_ctx.getConf("spark.sql.sources.default", - "org.apache.spark.sql.parquet") - jmode = self._java_save_mode(mode) - self._jdf.save(source, jmode, options) + return self.write.save(path, source, mode, **options) + + @property + def write(self): + """ + Interface for saving the content of the :class:`DataFrame` out + into external storage. + + :return :class:`DataFrameWriter` + + ::note: Experimental + + >>> df.write + <pyspark.sql.readwriter.DataFrameWriter object at ...> + """ + return DataFrameWriter(self) @property def schema(self): @@ -411,9 +398,19 @@ class DataFrame(object): self._jdf.unpersist(blocking) return self - # def coalesce(self, numPartitions, shuffle=False): - # rdd = self._jdf.coalesce(numPartitions, shuffle, None) - # return DataFrame(rdd, self.sql_ctx) + def coalesce(self, numPartitions): + """ + Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. + + Similar to coalesce defined on an :class:`RDD`, this operation results in a + narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, + there will not be a shuffle, instead each of the 100 new partitions will + claim 10 of the current partitions. + + >>> df.coalesce(1).rdd.getNumPartitions() + 1 + """ + return DataFrame(self._jdf.coalesce(numPartitions), self.sql_ctx) def repartition(self, numPartitions): """Returns a new :class:`DataFrame` that has exactly ``numPartitions`` partitions. |