diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql/readwriter.py | 56 | ||||
-rw-r--r-- | python/pyspark/sql/streaming.py | 30 |
2 files changed, 66 insertions, 20 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 64de33e8ec..3da6f497e9 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -156,7 +156,7 @@ class DataFrameReader(OptionUtils): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None): """ Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects (one object per record) and returns the result as a :class`DataFrame`. @@ -198,6 +198,14 @@ class DataFrameReader(OptionUtils): ``spark.sql.columnNameOfCorruptRecord``. If None is set, it uses the value specified in ``spark.sql.columnNameOfCorruptRecord``. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes @@ -213,7 +221,8 @@ class DataFrameReader(OptionUtils): allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, - mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, + timestampFormat=timestampFormat) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -285,8 +294,8 @@ class DataFrameReader(OptionUtils): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None): + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -327,9 +336,12 @@ class DataFrameReader(OptionUtils): is set, it uses the default value, ``Inf``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to parse times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -356,7 +368,8 @@ class DataFrameReader(OptionUtils): header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): path = [path] @@ -571,7 +584,7 @@ class DataFrameWriter(OptionUtils): self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None, compression=None): + def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -584,11 +597,20 @@ class DataFrameWriter(OptionUtils): :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) - self._set_opts(compression=compression) + self._set_opts( + compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat) self._jwrite.json(path) @since(1.4) @@ -634,7 +656,8 @@ class DataFrameWriter(OptionUtils): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, - header=None, nullValue=None, escapeQuotes=None, quoteAll=None): + header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, + timestampFormat=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -666,12 +689,21 @@ class DataFrameWriter(OptionUtils): the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, - nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll) + nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, + dateFormat=dateFormat, timestampFormat=timestampFormat) self._jwrite.csv(path) @since(1.5) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index a364555003..3761d2b199 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -338,7 +338,8 @@ class DataStreamReader(OptionUtils): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None, + timestampFormat=None): """ Loads a JSON file stream (one object per line) and returns a :class`DataFrame`. @@ -381,6 +382,14 @@ class DataStreamReader(OptionUtils): ``spark.sql.columnNameOfCorruptRecord``. If None is set, it uses the value specified in ``spark.sql.columnNameOfCorruptRecord``. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming @@ -393,7 +402,8 @@ class DataStreamReader(OptionUtils): allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, - mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, + timestampFormat=timestampFormat) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: @@ -450,8 +460,8 @@ class DataStreamReader(OptionUtils): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None): + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -494,9 +504,12 @@ class DataStreamReader(OptionUtils): is set, it uses the default value, ``Inf``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to parse times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -521,7 +534,8 @@ class DataStreamReader(OptionUtils): header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) |