aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsumansomasundar <suman.somasundar@oracle.com>2016-10-04 10:31:56 +0100
committerSean Owen <sowen@cloudera.com>2016-10-04 10:31:56 +0100
commit7d5160883542f3d9dcb3babda92880985398e9af (patch)
treed328424f28f76a1b1f46e5bf55b9ffb43988cf89
parent8e8de0073d71bb00baeb24c612d7841b6274f652 (diff)
downloadspark-7d5160883542f3d9dcb3babda92880985398e9af.tar.gz
spark-7d5160883542f3d9dcb3babda92880985398e9af.tar.bz2
spark-7d5160883542f3d9dcb3babda92880985398e9af.zip
[SPARK-16962][CORE][SQL] Fix misaligned record accesses for SPARC architectures
## What changes were proposed in this pull request? Made changes to record length offsets to make them uniform throughout various areas of Spark core and unsafe ## How was this patch tested? This change affects only SPARC architectures and was tested on X86 architectures as well for regression. Author: sumansomasundar <suman.somasundar@oracle.com> Closes #14762 from sumansomasundar/master.
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java58
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java31
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java57
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java19
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala11
6 files changed, 144 insertions, 46 deletions
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java
new file mode 100644
index 0000000000..be62e40412
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe;
+
+/**
+ * Class to make changes to record length offsets uniform through out
+ * various areas of Apache Spark core and unsafe. The SPARC platform
+ * requires this because using a 4 byte Int for record lengths causes
+ * the entire record of 8 byte Items to become misaligned by 4 bytes.
+ * Using a 8 byte long for record length keeps things 8 byte aligned.
+ */
+public class UnsafeAlignedOffset {
+
+ private static final int UAO_SIZE = Platform.unaligned() ? 4 : 8;
+
+ public static int getUaoSize() {
+ return UAO_SIZE;
+ }
+
+ public static int getSize(Object object, long offset) {
+ switch (UAO_SIZE) {
+ case 4:
+ return Platform.getInt(object, offset);
+ case 8:
+ return (int)Platform.getLong(object, offset);
+ default:
+ throw new AssertionError("Illegal UAO_SIZE");
+ }
+ }
+
+ public static void putSize(Object object, long offset, int value) {
+ switch (UAO_SIZE) {
+ case 4:
+ Platform.putInt(object, offset, value);
+ break;
+ case 8:
+ Platform.putLong(object, offset, value);
+ break;
+ default:
+ throw new AssertionError("Illegal UAO_SIZE");
+ }
+ }
+}
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index cf42877bf9..9c551ab19e 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -40,6 +40,7 @@ public class ByteArrayMethods {
}
}
+ private static final boolean unaligned = Platform.unaligned();
/**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
@@ -47,17 +48,33 @@ public class ByteArrayMethods {
public static boolean arrayEquals(
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
int i = 0;
- while (i <= length - 8) {
- if (Platform.getLong(leftBase, leftOffset + i) !=
- Platform.getLong(rightBase, rightOffset + i)) {
- return false;
+
+ // check if stars align and we can get both offsets to be aligned
+ if ((leftOffset % 8) == (rightOffset % 8)) {
+ while ((leftOffset + i) % 8 != 0 && i < length) {
+ if (Platform.getByte(leftBase, leftOffset + i) !=
+ Platform.getByte(rightBase, rightOffset + i)) {
+ return false;
+ }
+ i += 1;
+ }
+ }
+ // for architectures that suport unaligned accesses, chew it up 8 bytes at a time
+ if (unaligned || (((leftOffset + i) % 8 == 0) && ((rightOffset + i) % 8 == 0))) {
+ while (i <= length - 8) {
+ if (Platform.getLong(leftBase, leftOffset + i) !=
+ Platform.getLong(rightBase, rightOffset + i)) {
+ return false;
+ }
+ i += 8;
}
- i += 8;
}
+ // this will finish off the unaligned comparisons, or do the entire aligned
+ // comparison whichever is needed.
while (i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
- Platform.getByte(rightBase, rightOffset + i)) {
- return false;
+ Platform.getByte(rightBase, rightOffset + i)) {
+ return false;
}
i += 1;
}
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index e4289818f1..d2fcdea4f2 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -35,6 +35,7 @@ import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
@@ -273,8 +274,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
currentPage = dataPages.get(nextIdx);
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
- recordsInPage = Platform.getInt(pageBaseObject, offsetInPage);
- offsetInPage += 4;
+ recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
+ offsetInPage += UnsafeAlignedOffset.getUaoSize();
} else {
currentPage = null;
if (reader != null) {
@@ -321,10 +322,10 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
numRecords--;
if (currentPage != null) {
- int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
+ int totalLength = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
loc.with(currentPage, offsetInPage);
// [total size] [key size] [key] [value] [pointer to next]
- offsetInPage += 4 + totalLength + 8;
+ offsetInPage += UnsafeAlignedOffset.getUaoSize() + totalLength + 8;
recordsInPage --;
return loc;
} else {
@@ -367,14 +368,15 @@ public final class BytesToBytesMap extends MemoryConsumer {
Object base = block.getBaseObject();
long offset = block.getBaseOffset();
- int numRecords = Platform.getInt(base, offset);
- offset += 4;
+ int numRecords = UnsafeAlignedOffset.getSize(base, offset);
+ int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ offset += uaoSize;
final UnsafeSorterSpillWriter writer =
new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
while (numRecords > 0) {
- int length = Platform.getInt(base, offset);
- writer.write(base, offset + 4, length, 0);
- offset += 4 + length + 8;
+ int length = UnsafeAlignedOffset.getSize(base, offset);
+ writer.write(base, offset + uaoSize, length, 0);
+ offset += uaoSize + length + 8;
numRecords--;
}
writer.close();
@@ -530,13 +532,14 @@ public final class BytesToBytesMap extends MemoryConsumer {
private void updateAddressesAndSizes(final Object base, long offset) {
baseObject = base;
- final int totalLength = Platform.getInt(base, offset);
- offset += 4;
- keyLength = Platform.getInt(base, offset);
- offset += 4;
+ final int totalLength = UnsafeAlignedOffset.getSize(base, offset);
+ int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ offset += uaoSize;
+ keyLength = UnsafeAlignedOffset.getSize(base, offset);
+ offset += uaoSize;
keyOffset = offset;
valueOffset = offset + keyLength;
- valueLength = totalLength - keyLength - 4;
+ valueLength = totalLength - keyLength - uaoSize;
}
private Location with(int pos, int keyHashcode, boolean isDefined) {
@@ -565,10 +568,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
this.isDefined = true;
this.memoryPage = null;
baseObject = base;
- keyOffset = offset + 4;
- keyLength = Platform.getInt(base, offset);
- valueOffset = offset + 4 + keyLength;
- valueLength = length - 4 - keyLength;
+ int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ keyOffset = offset + uaoSize;
+ keyLength = UnsafeAlignedOffset.getSize(base, offset);
+ valueOffset = offset + uaoSize + keyLength;
+ valueLength = length - uaoSize - keyLength;
return this;
}
@@ -699,9 +703,10 @@ public final class BytesToBytesMap extends MemoryConsumer {
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (value) (8 byte pointer to next value)
- final long recordLength = 8 + klen + vlen + 8;
+ int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ final long recordLength = (2 * uaoSize) + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
- if (!acquireNewPage(recordLength + 4L)) {
+ if (!acquireNewPage(recordLength + uaoSize)) {
return false;
}
}
@@ -710,9 +715,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
final Object base = currentPage.getBaseObject();
long offset = currentPage.getBaseOffset() + pageCursor;
final long recordOffset = offset;
- Platform.putInt(base, offset, klen + vlen + 4);
- Platform.putInt(base, offset + 4, klen);
- offset += 8;
+ UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize);
+ UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen);
+ offset += (2 * uaoSize);
Platform.copyMemory(kbase, koff, base, offset, klen);
offset += klen;
Platform.copyMemory(vbase, voff, base, offset, vlen);
@@ -722,7 +727,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
// --- Update bookkeeping data structures ----------------------------------------------------
offset = currentPage.getBaseOffset();
- Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
+ UnsafeAlignedOffset.putSize(base, offset, UnsafeAlignedOffset.getSize(base, offset) + 1);
pageCursor += recordLength;
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
@@ -757,8 +762,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
return false;
}
dataPages.add(currentPage);
- Platform.putInt(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
- pageCursor = 4;
+ UnsafeAlignedOffset.putSize(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
+ pageCursor = UnsafeAlignedOffset.getUaoSize();
return true;
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 8ca29a58f8..428ff72e71 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -34,6 +34,7 @@ import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.TaskCompletionListener;
@@ -392,14 +393,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
growPointerArrayIfNecessary();
+ int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 bytes to store the record length.
- final int required = length + 4;
+ final int required = length + uaoSize;
acquireNewPageIfNecessary(required);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
- Platform.putInt(base, pageCursor, length);
- pageCursor += 4;
+ UnsafeAlignedOffset.putSize(base, pageCursor, length);
+ pageCursor += uaoSize;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
@@ -418,15 +420,16 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
throws IOException {
growPointerArrayIfNecessary();
- final int required = keyLen + valueLen + 4 + 4;
+ int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ final int required = keyLen + valueLen + (2 * uaoSize);
acquireNewPageIfNecessary(required);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
- Platform.putInt(base, pageCursor, keyLen + valueLen + 4);
- pageCursor += 4;
- Platform.putInt(base, pageCursor, keyLen);
- pageCursor += 4;
+ UnsafeAlignedOffset.putSize(base, pageCursor, keyLen + valueLen + uaoSize);
+ pageCursor += uaoSize;
+ UnsafeAlignedOffset.putSize(base, pageCursor, keyLen);
+ pageCursor += uaoSize;
Platform.copyMemory(keyBase, keyOffset, base, pageCursor, keyLen);
pageCursor += keyLen;
Platform.copyMemory(valueBase, valueOffset, base, pageCursor, valueLen);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 8ecd20910a..2a71e68ada 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -25,6 +25,7 @@ import org.apache.avro.reflect.Nullable;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.Sorter;
@@ -56,11 +57,14 @@ public final class UnsafeInMemorySorter {
@Override
public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
final int prefixComparisonResult = prefixComparator.compare(r1.keyPrefix, r2.keyPrefix);
+ int uaoSize = UnsafeAlignedOffset.getUaoSize();
if (prefixComparisonResult == 0) {
final Object baseObject1 = memoryManager.getPage(r1.recordPointer);
- final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + 4; // skip length
+ // skip length
+ final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + uaoSize;
final Object baseObject2 = memoryManager.getPage(r2.recordPointer);
- final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + 4; // skip length
+ // skip length
+ final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + uaoSize;
return recordComparator.compare(baseObject1, baseOffset1, baseObject2, baseOffset2);
} else {
return prefixComparisonResult;
@@ -282,9 +286,11 @@ public final class UnsafeInMemorySorter {
// This pointer points to a 4-byte record length, followed by the record's bytes
final long recordPointer = array.get(offset + position);
currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer);
+ int uaoSize = UnsafeAlignedOffset.getUaoSize();
baseObject = memoryManager.getPage(recordPointer);
- baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
- recordLength = Platform.getInt(baseObject, baseOffset - 4);
+ // Skip over record length
+ baseOffset = memoryManager.getOffsetInPage(recordPointer) + uaoSize;
+ recordLength = UnsafeAlignedOffset.getSize(baseObject, baseOffset - uaoSize);
keyPrefix = array.get(offset + position + 1);
position += 2;
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
index 0f4680e502..d1fece05a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
@@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.columnar.{ColumnBuilder, NativeColumnBuilder}
import org.apache.spark.sql.types.AtomicType
+import org.apache.spark.unsafe.Platform
/**
* A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of
@@ -61,8 +62,12 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType]
super.initialize(initialSize, columnName, useCompression)
}
+ // The various compression schemes, while saving memory use, cause all of the data within
+ // the row to become unaligned, thus causing crashes. Until a way of fixing the compression
+ // is found to also allow aligned accesses this must be disabled for SPARC.
+
protected def isWorthCompressing(encoder: Encoder[T]) = {
- encoder.compressionRatio < 0.8
+ CompressibleColumnBuilder.unaligned && encoder.compressionRatio < 0.8
}
private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
@@ -103,3 +108,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType]
encoder.compress(nonNullBuffer, compressedBuffer)
}
}
+
+private[columnar] object CompressibleColumnBuilder {
+ val unaligned = Platform.unaligned()
+}