From 4b4e329e49f8af28fa6301bd06c48d7097eaf9e6 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Tue, 30 Aug 2016 09:15:31 -0700 Subject: [SPARK-5682][CORE] Add encrypted shuffle in spark This patch is using Apache Commons Crypto library to enable shuffle encryption support. Author: Ferdinand Xu Author: kellyzly Closes #8880 from winningsix/SPARK-10771. --- .../spark/security/CryptoStreamUtilsSuite.scala | 107 +++++++++++++++++++++ .../sort/BypassMergeSortShuffleWriterSuite.scala | 2 +- 2 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala (limited to 'core/src/test/scala/org/apache') diff --git a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala new file mode 100644 index 0000000000..81eb907ac7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.security + +import java.security.PrivilegedExceptionAction + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.security.CryptoStreamUtils._ + +class CryptoStreamUtilsSuite extends SparkFunSuite { + val ugi = UserGroupInformation.createUserForTesting("testuser", Array("testgroup")) + + test("Crypto configuration conversion") { + val sparkKey1 = s"${SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX}a.b.c" + val sparkVal1 = "val1" + val cryptoKey1 = s"${COMMONS_CRYPTO_CONF_PREFIX}a.b.c" + + val sparkKey2 = SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX.stripSuffix(".") + "A.b.c" + val sparkVal2 = "val2" + val cryptoKey2 = s"${COMMONS_CRYPTO_CONF_PREFIX}A.b.c" + val conf = new SparkConf() + conf.set(sparkKey1, sparkVal1) + conf.set(sparkKey2, sparkVal2) + val props = CryptoStreamUtils.toCryptoConf(conf) + assert(props.getProperty(cryptoKey1) === sparkVal1) + assert(!props.containsKey(cryptoKey2)) + } + + test("Shuffle encryption is disabled by default") { + ugi.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val credentials = UserGroupInformation.getCurrentUser.getCredentials() + val conf = new SparkConf() + initCredentials(conf, credentials) + assert(credentials.getSecretKey(SPARK_IO_TOKEN) === null) + } + }) + } + + test("Shuffle encryption key length should be 128 by default") { + ugi.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val credentials = UserGroupInformation.getCurrentUser.getCredentials() + val conf = new SparkConf() + conf.set(IO_ENCRYPTION_ENABLED, true) + initCredentials(conf, credentials) + var key = credentials.getSecretKey(SPARK_IO_TOKEN) + assert(key !== null) + val actual = key.length * (java.lang.Byte.SIZE) + assert(actual === 128) + } + }) + } + + test("Initial credentials with key length in 256") { + ugi.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val credentials = UserGroupInformation.getCurrentUser.getCredentials() + val conf = new SparkConf() + conf.set(IO_ENCRYPTION_KEY_SIZE_BITS, 256) + conf.set(IO_ENCRYPTION_ENABLED, true) + initCredentials(conf, credentials) + var key = credentials.getSecretKey(SPARK_IO_TOKEN) + assert(key !== null) + val actual = key.length * (java.lang.Byte.SIZE) + assert(actual === 256) + } + }) + } + + test("Initial credentials with invalid key length") { + ugi.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val credentials = UserGroupInformation.getCurrentUser.getCredentials() + val conf = new SparkConf() + conf.set(IO_ENCRYPTION_KEY_SIZE_BITS, 328) + conf.set(IO_ENCRYPTION_ENABLED, true) + val thrown = intercept[IllegalArgumentException] { + initCredentials(conf, credentials) + } + } + }) + } + + private[this] def initCredentials(conf: SparkConf, credentials: Credentials): Unit = { + if (conf.get(IO_ENCRYPTION_ENABLED)) { + SecurityManager.initIOEncryptionKey(conf, credentials) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 5132384a5e..ed9428820f 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -94,7 +94,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte args(1).asInstanceOf[File], args(2).asInstanceOf[SerializerInstance], args(3).asInstanceOf[Int], - compressStream = identity, + wrapStream = identity, syncWrites = false, args(4).asInstanceOf[ShuffleWriteMetrics], blockId = args(0).asInstanceOf[BlockId] -- cgit v1.2.3