aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-11-21 17:24:02 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-21 17:24:02 -0800
commit97a8239a625df455d2c439f3628a529d6d9413ca (patch)
treee34deed31311422c708c4e48a95f2611092a441e
parenta2d464770cd183daa7d727bf377bde9c21e29e6a (diff)
downloadspark-97a8239a625df455d2c439f3628a529d6d9413ca.tar.gz
spark-97a8239a625df455d2c439f3628a529d6d9413ca.tar.bz2
spark-97a8239a625df455d2c439f3628a529d6d9413ca.zip
[SPARK-18493] Add missing python APIs: withWatermark and checkpoint to dataframe
## What changes were proposed in this pull request? This PR adds two of the newly added methods of `Dataset`s to Python: `withWatermark` and `checkpoint` ## How was this patch tested? Doc tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #15921 from brkyvz/py-watermark.
-rw-r--r--python/pyspark/sql/dataframe.py57
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala10
2 files changed, 62 insertions, 5 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 3899890083..6fe6226432 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -322,6 +322,54 @@ class DataFrame(object):
def __repr__(self):
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
+ @since(2.1)
+ def checkpoint(self, eager=True):
+ """Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
+ logical plan of this DataFrame, which is especially useful in iterative algorithms where the
+ plan may grow exponentially. It will be saved to files inside the checkpoint
+ directory set with L{SparkContext.setCheckpointDir()}.
+
+ :param eager: Whether to checkpoint this DataFrame immediately
+
+ .. note:: Experimental
+ """
+ jdf = self._jdf.checkpoint(eager)
+ return DataFrame(jdf, self.sql_ctx)
+
+ @since(2.1)
+ def withWatermark(self, eventTime, delayThreshold):
+ """Defines an event time watermark for this :class:`DataFrame`. A watermark tracks a point
+ in time before which we assume no more late data is going to arrive.
+
+ Spark will use this watermark for several purposes:
+ - To know when a given time window aggregation can be finalized and thus can be emitted
+ when using output modes that do not allow updates.
+
+ - To minimize the amount of state that we need to keep for on-going aggregations.
+
+ The current watermark is computed by looking at the `MAX(eventTime)` seen across
+ all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost
+ of coordinating this value across partitions, the actual watermark used is only guaranteed
+ to be at least `delayThreshold` behind the actual event time. In some cases we may still
+ process records that arrive more than `delayThreshold` late.
+
+ :param eventTime: the name of the column that contains the event time of the row.
+ :param delayThreshold: the minimum delay to wait to data to arrive late, relative to the
+ latest record that has been processed in the form of an interval
+ (e.g. "1 minute" or "5 hours").
+
+ .. note:: Experimental
+
+ >>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes')
+ DataFrame[name: string, time: timestamp]
+ """
+ if not eventTime or type(eventTime) is not str:
+ raise TypeError("eventTime should be provided as a string")
+ if not delayThreshold or type(delayThreshold) is not str:
+ raise TypeError("delayThreshold should be provided as a string interval")
+ jdf = self._jdf.withWatermark(eventTime, delayThreshold)
+ return DataFrame(jdf, self.sql_ctx)
+
@since(1.3)
def count(self):
"""Returns the number of rows in this :class:`DataFrame`.
@@ -1626,6 +1674,7 @@ def _test():
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
import pyspark.sql.dataframe
+ from pyspark.sql.functions import from_unixtime
globs = pyspark.sql.dataframe.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
@@ -1638,9 +1687,11 @@ def _test():
globs['df3'] = sc.parallelize([Row(name='Alice', age=2),
Row(name='Bob', age=5)]).toDF()
globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80),
- Row(name='Bob', age=5, height=None),
- Row(name='Tom', age=None, height=None),
- Row(name=None, age=None, height=None)]).toDF()
+ Row(name='Bob', age=5, height=None),
+ Row(name='Tom', age=None, height=None),
+ Row(name=None, age=None, height=None)]).toDF()
+ globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846),
+ Row(name='Bob', time=1479442946)]).toDF()
(failure_count, test_count) = doctest.testmod(
pyspark.sql.dataframe, globs=globs,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3c75a6a45e..7ba6ffce27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -485,7 +485,10 @@ class Dataset[T] private[sql](
def isStreaming: Boolean = logicalPlan.isStreaming
/**
- * Returns a checkpointed version of this Dataset.
+ * Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate
+ * the logical plan of this Dataset, which is especially useful in iterative algorithms where the
+ * plan may grow exponentially. It will be saved to files inside the checkpoint
+ * directory set with `SparkContext#setCheckpointDir`.
*
* @group basic
* @since 2.1.0
@@ -495,7 +498,10 @@ class Dataset[T] private[sql](
def checkpoint(): Dataset[T] = checkpoint(eager = true)
/**
- * Returns a checkpointed version of this Dataset.
+ * Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
+ * logical plan of this Dataset, which is especially useful in iterative algorithms where the
+ * plan may grow exponentially. It will be saved to files inside the checkpoint
+ * directory set with `SparkContext#setCheckpointDir`.
*
* @group basic
* @since 2.1.0