aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorFerdinand Xu <cheng.a.xu@intel.com>2016-08-30 09:15:31 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-30 09:15:31 -0700
commit4b4e329e49f8af28fa6301bd06c48d7097eaf9e6 (patch)
tree91ec684d78a76de75097723f82537be3e01a1c28 /core/src/test/scala/org/apache
parent27209252f09ff73c58e60c6df8aaba73b308088c (diff)
downloadspark-4b4e329e49f8af28fa6301bd06c48d7097eaf9e6.tar.gz
spark-4b4e329e49f8af28fa6301bd06c48d7097eaf9e6.tar.bz2
spark-4b4e329e49f8af28fa6301bd06c48d7097eaf9e6.zip
[SPARK-5682][CORE] Add encrypted shuffle in spark
This patch is using Apache Commons Crypto library to enable shuffle encryption support. Author: Ferdinand Xu <cheng.a.xu@intel.com> Author: kellyzly <kellyzly@126.com> Closes #8880 from winningsix/SPARK-10771.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala107
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala2
2 files changed, 108 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
new file mode 100644
index 0000000000..81eb907ac7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
+
+import java.security.PrivilegedExceptionAction
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.security.CryptoStreamUtils._
+
+class CryptoStreamUtilsSuite extends SparkFunSuite {
+ val ugi = UserGroupInformation.createUserForTesting("testuser", Array("testgroup"))
+
+ test("Crypto configuration conversion") {
+ val sparkKey1 = s"${SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX}a.b.c"
+ val sparkVal1 = "val1"
+ val cryptoKey1 = s"${COMMONS_CRYPTO_CONF_PREFIX}a.b.c"
+
+ val sparkKey2 = SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX.stripSuffix(".") + "A.b.c"
+ val sparkVal2 = "val2"
+ val cryptoKey2 = s"${COMMONS_CRYPTO_CONF_PREFIX}A.b.c"
+ val conf = new SparkConf()
+ conf.set(sparkKey1, sparkVal1)
+ conf.set(sparkKey2, sparkVal2)
+ val props = CryptoStreamUtils.toCryptoConf(conf)
+ assert(props.getProperty(cryptoKey1) === sparkVal1)
+ assert(!props.containsKey(cryptoKey2))
+ }
+
+ test("Shuffle encryption is disabled by default") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials()
+ val conf = new SparkConf()
+ initCredentials(conf, credentials)
+ assert(credentials.getSecretKey(SPARK_IO_TOKEN) === null)
+ }
+ })
+ }
+
+ test("Shuffle encryption key length should be 128 by default") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials()
+ val conf = new SparkConf()
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ initCredentials(conf, credentials)
+ var key = credentials.getSecretKey(SPARK_IO_TOKEN)
+ assert(key !== null)
+ val actual = key.length * (java.lang.Byte.SIZE)
+ assert(actual === 128)
+ }
+ })
+ }
+
+ test("Initial credentials with key length in 256") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials()
+ val conf = new SparkConf()
+ conf.set(IO_ENCRYPTION_KEY_SIZE_BITS, 256)
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ initCredentials(conf, credentials)
+ var key = credentials.getSecretKey(SPARK_IO_TOKEN)
+ assert(key !== null)
+ val actual = key.length * (java.lang.Byte.SIZE)
+ assert(actual === 256)
+ }
+ })
+ }
+
+ test("Initial credentials with invalid key length") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials()
+ val conf = new SparkConf()
+ conf.set(IO_ENCRYPTION_KEY_SIZE_BITS, 328)
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ val thrown = intercept[IllegalArgumentException] {
+ initCredentials(conf, credentials)
+ }
+ }
+ })
+ }
+
+ private[this] def initCredentials(conf: SparkConf, credentials: Credentials): Unit = {
+ if (conf.get(IO_ENCRYPTION_ENABLED)) {
+ SecurityManager.initIOEncryptionKey(conf, credentials)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 5132384a5e..ed9428820f 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -94,7 +94,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
args(1).asInstanceOf[File],
args(2).asInstanceOf[SerializerInstance],
args(3).asInstanceOf[Int],
- compressStream = identity,
+ wrapStream = identity,
syncWrites = false,
args(4).asInstanceOf[ShuffleWriteMetrics],
blockId = args(0).asInstanceOf[BlockId]