aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPete Robbins <robbinspg@gmail.com>2016-05-02 13:16:46 -0700
committerDavies Liu <davies.liu@gmail.com>2016-05-02 13:16:46 -0700
commit8a1ce4899fb9f751dedaaa34ea654dfbc8330852 (patch)
treecd364ceca2e351b13464ab6e0c9fc19a7d3d2335
parent95e372141a102f933045fe9472bbe1ce8c91b5d5 (diff)
downloadspark-8a1ce4899fb9f751dedaaa34ea654dfbc8330852.tar.gz
spark-8a1ce4899fb9f751dedaaa34ea654dfbc8330852.tar.bz2
spark-8a1ce4899fb9f751dedaaa34ea654dfbc8330852.zip
[SPARK-13745] [SQL] Support columnar in memory representation on Big Endian platforms
## What changes were proposed in this pull request? parquet datasource and ColumnarBatch tests fail on big-endian platforms This patch adds support for the little-endian byte arrays being correctly interpreted on a big-endian platform ## How was this patch tested? Spark test builds ran on big endian z/Linux and regression build on little endian amd64 Author: Pete Robbins <robbinspg@gmail.com> Closes #12397 from robbinspg/master.
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java28
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java54
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java41
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala9
4 files changed, 110 insertions, 22 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 2672e0453b..9475c853a0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.unsafe.Platform;
@@ -31,6 +33,9 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
private byte[] buffer;
private int offset;
private int bitOffset; // Only used for booleans.
+ private ByteBuffer byteBuffer; // used to wrap the byte array buffer
+
+ private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
public VectorizedPlainValuesReader() {
}
@@ -39,6 +44,9 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
this.buffer = bytes;
this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
+ if (bigEndianPlatform) {
+ byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+ }
}
@Override
@@ -103,6 +111,9 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
@Override
public final int readInteger() {
int v = Platform.getInt(buffer, offset);
+ if (bigEndianPlatform) {
+ v = java.lang.Integer.reverseBytes(v);
+ }
offset += 4;
return v;
}
@@ -110,6 +121,9 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
@Override
public final long readLong() {
long v = Platform.getLong(buffer, offset);
+ if (bigEndianPlatform) {
+ v = java.lang.Long.reverseBytes(v);
+ }
offset += 8;
return v;
}
@@ -121,14 +135,24 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
@Override
public final float readFloat() {
- float v = Platform.getFloat(buffer, offset);
+ float v;
+ if (!bigEndianPlatform) {
+ v = Platform.getFloat(buffer, offset);
+ } else {
+ v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
+ }
offset += 4;
return v;
}
@Override
public final double readDouble() {
- double v = Platform.getDouble(buffer, offset);
+ double v;
+ if (!bigEndianPlatform) {
+ v = Platform.getDouble(buffer, offset);
+ } else {
+ v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
+ }
offset += 8;
return v;
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index b190141135..b8dd16227e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.vectorized;
+import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.commons.lang.NotImplementedException;
@@ -28,6 +29,9 @@ import org.apache.spark.unsafe.Platform;
* Column data backed using offheap memory.
*/
public final class OffHeapColumnVector extends ColumnVector {
+
+ private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+
// The data stored in these two allocations need to maintain binary compatible. We can
// directly pass this buffer to external components.
private long nulls;
@@ -39,9 +43,7 @@ public final class OffHeapColumnVector extends ColumnVector {
protected OffHeapColumnVector(int capacity, DataType type) {
super(capacity, type, MemoryMode.OFF_HEAP);
- if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) {
- throw new NotImplementedException("Only little endian is supported.");
- }
+
nulls = 0;
data = 0;
lengthData = 0;
@@ -221,8 +223,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
- Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
- null, data + 4 * rowId, count * 4);
+ if (!bigEndianPlatform) {
+ Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
+ null, data + 4 * rowId, count * 4);
+ } else {
+ int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+ long offset = data + 4 * rowId;
+ for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) {
+ Platform.putInt(null, offset, java.lang.Integer.reverseBytes(Platform.getInt(src, srcOffset)));
+ }
+ }
}
@Override
@@ -259,8 +269,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
- Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
- null, data + 8 * rowId, count * 8);
+ if (!bigEndianPlatform) {
+ Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
+ null, data + 8 * rowId, count * 8);
+ } else {
+ int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+ long offset = data + 8 * rowId;
+ for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) {
+ Platform.putLong(null, offset, java.lang.Long.reverseBytes(Platform.getLong(src, srcOffset)));
+ }
+ }
}
@Override
@@ -297,8 +315,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
- Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
- null, data + rowId * 4, count * 4);
+ if (!bigEndianPlatform) {
+ Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
+ null, data + rowId * 4, count * 4);
+ } else {
+ ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+ long offset = data + 4 * rowId;
+ for (int i = 0; i < count; ++i, offset += 4) {
+ Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
+ }
+ }
}
@Override
@@ -336,8 +362,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
- Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
+ if (!bigEndianPlatform) {
+ Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
null, data + rowId * 8, count * 8);
+ } else {
+ ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+ long offset = data + 8 * rowId;
+ for (int i = 0; i < count; ++i, offset += 8) {
+ Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
+ }
+ }
}
@Override
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index e97276800d..b1ffe4c210 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.vectorized;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Arrays;
import org.apache.spark.memory.MemoryMode;
@@ -27,6 +29,9 @@ import org.apache.spark.unsafe.Platform;
* and a java array for the values.
*/
public final class OnHeapColumnVector extends ColumnVector {
+
+ private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+
// The data stored in these arrays need to maintain binary compatible. We can
// directly pass this buffer to external components.
@@ -211,10 +216,11 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
- for (int i = 0; i < count; ++i) {
+ for (int i = 0; i < count; ++i, srcOffset += 4) {
intData[i + rowId] = Platform.getInt(src, srcOffset);
- srcIndex += 4;
- srcOffset += 4;
+ if (bigEndianPlatform) {
+ intData[i + rowId] = java.lang.Integer.reverseBytes(intData[i + rowId]);
+ }
}
}
@@ -251,10 +257,11 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
- for (int i = 0; i < count; ++i) {
+ for (int i = 0; i < count; ++i, srcOffset += 8) {
longData[i + rowId] = Platform.getLong(src, srcOffset);
- srcIndex += 8;
- srcOffset += 8;
+ if (bigEndianPlatform) {
+ longData[i + rowId] = java.lang.Long.reverseBytes(longData[i + rowId]);
+ }
}
}
@@ -286,8 +293,15 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
- Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
- floatData, Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
+ if (!bigEndianPlatform) {
+ Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, floatData,
+ Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
+ } else {
+ ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+ for (int i = 0; i < count; ++i) {
+ floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
+ }
+ }
}
@Override
@@ -320,8 +334,15 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
- Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
- Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
+ if (!bigEndianPlatform) {
+ Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
+ Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
+ } else {
+ ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+ for (int i = 0; i < count; ++i) {
+ doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i));
+ }
+ }
}
@Override
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index a63007fc3b..7e576a8657 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.execution.vectorized
import java.nio.charset.StandardCharsets
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -280,6 +282,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234)
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)
+ if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
+ // Ensure array contains Liitle Endian doubles
+ var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
+ Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
+ Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8))
+ }
+
column.putDoubles(idx, 1, buffer, 8)
column.putDoubles(idx + 1, 1, buffer, 0)
reference += 1.123