aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala31
1 files changed, 25 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index e4932a4192..550e075a95 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -125,15 +125,34 @@ private[spark] object SerDe {
}
def readDate(in: DataInputStream): Date = {
- Date.valueOf(readString(in))
+ try {
+ val inStr = readString(in)
+ if (inStr == "NA") {
+ null
+ } else {
+ Date.valueOf(inStr)
+ }
+ } catch {
+ // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE
+ case _: NegativeArraySizeException => null
+ }
}
def readTime(in: DataInputStream): Timestamp = {
- val seconds = in.readDouble()
- val sec = Math.floor(seconds).toLong
- val t = new Timestamp(sec * 1000L)
- t.setNanos(((seconds - sec) * 1e9).toInt)
- t
+ try {
+ val seconds = in.readDouble()
+ if (java.lang.Double.isNaN(seconds)) {
+ null
+ } else {
+ val sec = Math.floor(seconds).toLong
+ val t = new Timestamp(sec * 1000L)
+ t.setNanos(((seconds - sec) * 1e9).toInt)
+ t
+ }
+ } catch {
+ // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE
+ case _: NegativeArraySizeException => null
+ }
}
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {