aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorHossein <hossein@databricks.com>2016-10-21 12:38:52 -0700
committerFelix Cheung <felixcheung@apache.org>2016-10-21 12:38:52 -0700
commite371040a0150e4ed748a7c25465965840b61ca63 (patch)
tree3ec1992dfbb0487b318daf425cfa10229a02c99b /core/src
parente21e1c946c4b7448fb150cfa2d9419864ae6f9b5 (diff)
downloadspark-e371040a0150e4ed748a7c25465965840b61ca63.tar.gz
spark-e371040a0150e4ed748a7c25465965840b61ca63.tar.bz2
spark-e371040a0150e4ed748a7c25465965840b61ca63.zip
[SPARK-17811] SparkR cannot parallelize data.frame with NA or NULL in Date columns
## What changes were proposed in this pull request? NA date values are serialized as "NA" and NA time values are serialized as NaN from R. In the backend we did not have proper logic to deal with them. As a result we got an IllegalArgumentException for Date and wrong value for time. This PR adds support for deserializing NA as Date and Time. ## How was this patch tested? * [x] TODO Author: Hossein <hossein@databricks.com> Closes #15421 from falaki/SPARK-17811.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala31
1 files changed, 25 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index e4932a4192..550e075a95 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -125,15 +125,34 @@ private[spark] object SerDe {
}
def readDate(in: DataInputStream): Date = {
- Date.valueOf(readString(in))
+ try {
+ val inStr = readString(in)
+ if (inStr == "NA") {
+ null
+ } else {
+ Date.valueOf(inStr)
+ }
+ } catch {
+ // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE
+ case _: NegativeArraySizeException => null
+ }
}
def readTime(in: DataInputStream): Timestamp = {
- val seconds = in.readDouble()
- val sec = Math.floor(seconds).toLong
- val t = new Timestamp(sec * 1000L)
- t.setNanos(((seconds - sec) * 1e9).toInt)
- t
+ try {
+ val seconds = in.readDouble()
+ if (java.lang.Double.isNaN(seconds)) {
+ null
+ } else {
+ val sec = Math.floor(seconds).toLong
+ val t = new Timestamp(sec * 1000L)
+ t.setNanos(((seconds - sec) * 1e9).toInt)
+ t
+ }
+ } catch {
+ // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE
+ case _: NegativeArraySizeException => null
+ }
}
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {