diff options
author | Reynold Xin <rxin@databricks.com> | 2015-10-08 17:25:14 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-10-08 17:25:14 -0700 |
commit | 84ea287178247c163226e835490c9c70b17d8d3b (patch) | |
tree | 1e01cf3ce4db65842b0685d55e954e089a8ddf68 /sql/catalyst/src | |
parent | 02149ff08eed3745086589a047adbce9a580389f (diff) | |
download | spark-84ea287178247c163226e835490c9c70b17d8d3b.tar.gz spark-84ea287178247c163226e835490c9c70b17d8d3b.tar.bz2 spark-84ea287178247c163226e835490c9c70b17d8d3b.zip |
[SPARK-10914] UnsafeRow serialization breaks when two machines have different Oops size.
UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM).
To reproduce, launch Spark using
MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops"
And then run the following
scala> sql("select 1 xx").collect()
Author: Reynold Xin <rxin@databricks.com>
Closes #9030 from rxin/SPARK-10914.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r-- | sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java | 47 |
1 files changed, 44 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index e8ac2999c2..5af7ed5d6e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -17,8 +17,7 @@ package org.apache.spark.sql.catalyst.expressions; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; @@ -26,6 +25,11 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; @@ -35,6 +39,7 @@ import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; import static org.apache.spark.sql.types.DataTypes.*; +import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; /** * An Unsafe implementation of Row which is backed by raw memory instead of Java objects. @@ -52,7 +57,7 @@ import static org.apache.spark.sql.types.DataTypes.*; * * Instances of `UnsafeRow` act as pointers to row data stored in this format. */ -public final class UnsafeRow extends MutableRow { +public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable { ////////////////////////////////////////////////////////////////////////////// // Static methods @@ -596,4 +601,40 @@ public final class UnsafeRow extends MutableRow { public void writeToMemory(Object target, long targetOffset) { Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes); } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + byte[] bytes = getBytes(); + out.writeInt(bytes.length); + out.writeInt(this.numFields); + out.write(bytes); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + this.baseOffset = BYTE_ARRAY_OFFSET; + this.sizeInBytes = in.readInt(); + this.numFields = in.readInt(); + this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); + this.baseObject = new byte[sizeInBytes]; + in.readFully((byte[]) baseObject); + } + + @Override + public void write(Kryo kryo, Output out) { + byte[] bytes = getBytes(); + out.writeInt(bytes.length); + out.writeInt(this.numFields); + out.write(bytes); + } + + @Override + public void read(Kryo kryo, Input in) { + this.baseOffset = BYTE_ARRAY_OFFSET; + this.sizeInBytes = in.readInt(); + this.numFields = in.readInt(); + this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); + this.baseObject = new byte[sizeInBytes]; + in.read((byte[]) baseObject); + } } |