diff options
author | Reynold Xin <rxin@databricks.com> | 2016-02-28 17:25:07 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-28 17:25:07 -0800 |
commit | 9e01dcc6446f8648e61062f8afe62589b9d4b5ab (patch) | |
tree | ae3c7015e950de315a490ce58c181671bfd12907 /network/yarn | |
parent | cca79fad66c4315b0ed6de59fd87700a540e6646 (diff) | |
download | spark-9e01dcc6446f8648e61062f8afe62589b9d4b5ab.tar.gz spark-9e01dcc6446f8648e61062f8afe62589b9d4b5ab.tar.bz2 spark-9e01dcc6446f8648e61062f8afe62589b9d4b5ab.zip |
[SPARK-13529][BUILD] Move network/* modules into common/network-*
## What changes were proposed in this pull request?
As the title says, this moves the three modules currently in network/ into common/network-*. This removes one top level, non-user-facing folder.
## How was this patch tested?
Compilation and existing tests. We should run both SBT and Maven.
Author: Reynold Xin <rxin@databricks.com>
Closes #11409 from rxin/SPARK-13529.
Diffstat (limited to 'network/yarn')
3 files changed, 0 insertions, 414 deletions
diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml deleted file mode 100644 index 3cb44324f2..0000000000 --- a/network/yarn/pom.xml +++ /dev/null @@ -1,148 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.spark</groupId> - <artifactId>spark-network-yarn_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Project YARN Shuffle Service</name> - <url>http://spark.apache.org/</url> - <properties> - <sbt.project.name>network-yarn</sbt.project.name> - <!-- Make sure all Hadoop dependencies are provided to avoid repackaging. --> - <hadoop.deps.scope>provided</hadoop.deps.scope> - <shuffle.jar>${project.build.directory}/scala-${scala.binary.version}/spark-${project.version}-yarn-shuffle.jar</shuffle.jar> - <shade>org/spark-project/</shade> - </properties> - - <dependencies> - <!-- Core dependencies --> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-network-shuffle_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-test-tags_${scala.binary.version}</artifactId> - </dependency> - - <!-- Provided dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <shadedArtifactAttached>false</shadedArtifactAttached> - <outputFile>${shuffle.jar}</outputFile> - <artifactSet> - <includes> - <include>*:*</include> - </includes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - <relocations> - <relocation> - <pattern>com.fasterxml.jackson</pattern> - <shadedPattern>org.spark-project.com.fasterxml.jackson</shadedPattern> - <includes> - <include>com.fasterxml.jackson.**</include> - </includes> - </relocation> - </relocations> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - </execution> - </executions> - </plugin> - - <!-- probes to validate that those dependencies which must be shaded are --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <phase>verify</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <target> - <macrodef name="shaded"> - <attribute name="resource"/> - <sequential> - <fail message="Not found ${shade}@{resource}"> - <condition> - <not> - <resourceexists> - <zipentry zipfile="${shuffle.jar}" name="${shade}@{resource}"/> - </resourceexists> - </not> - </condition> - </fail> - </sequential> - </macrodef> - <echo>Verifying dependency shading</echo> - <shaded resource="com/fasterxml/jackson/core/JsonParser.class" /> - <shaded resource="com/fasterxml/jackson/annotation/JacksonAnnotation.class" /> - <shaded resource="com/fasterxml/jackson/databind/JsonSerializer.class" /> - </target> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java deleted file mode 100644 index ba6d30a74c..0000000000 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.yarn; - -import java.io.File; -import java.nio.ByteBuffer; -import java.util.List; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.server.api.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.network.TransportContext; -import org.apache.spark.network.sasl.SaslServerBootstrap; -import org.apache.spark.network.sasl.ShuffleSecretManager; -import org.apache.spark.network.server.TransportServer; -import org.apache.spark.network.server.TransportServerBootstrap; -import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; -import org.apache.spark.network.util.TransportConf; -import org.apache.spark.network.yarn.util.HadoopConfigProvider; - -/** - * An external shuffle service used by Spark on Yarn. - * - * This is intended to be a long-running auxiliary service that runs in the NodeManager process. - * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`. - * The application also automatically derives the service port through `spark.shuffle.service.port` - * specified in the Yarn configuration. This is so that both the clients and the server agree on - * the same port to communicate on. - * - * The service also optionally supports authentication. This ensures that executors from one - * application cannot read the shuffle files written by those from another. This feature can be - * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM. - * Note that the Spark application must also set `spark.authenticate` manually and, unlike in - * the case of the service port, will not inherit this setting from the Yarn configuration. This - * is because an application running on the same Yarn cluster may choose to not use the external - * shuffle service, in which case its setting of `spark.authenticate` should be independent of - * the service's. - */ -public class YarnShuffleService extends AuxiliaryService { - private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); - - // Port on which the shuffle server listens for fetch requests - private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; - private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; - - // Whether the shuffle server should authenticate fetch requests - private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; - private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; - - // An entity that manages the shuffle secret per application - // This is used only if authentication is enabled - private ShuffleSecretManager secretManager; - - // The actual server that serves shuffle files - private TransportServer shuffleServer = null; - - // Handles registering executors and opening shuffle blocks - @VisibleForTesting - ExternalShuffleBlockHandler blockHandler; - - // Where to store & reload executor info for recovering state after an NM restart - @VisibleForTesting - File registeredExecutorFile; - - // just for testing when you want to find an open port - @VisibleForTesting - static int boundPort = -1; - - // just for integration tests that want to look at this file -- in general not sensible as - // a static - @VisibleForTesting - static YarnShuffleService instance; - - public YarnShuffleService() { - super("spark_shuffle"); - logger.info("Initializing YARN shuffle service for Spark"); - instance = this; - } - - /** - * Return whether authentication is enabled as specified by the configuration. - * If so, fetch requests will fail unless the appropriate authentication secret - * for the application is provided. - */ - private boolean isAuthenticationEnabled() { - return secretManager != null; - } - - /** - * Start the shuffle server with the given configuration. - */ - @Override - protected void serviceInit(Configuration conf) { - - // In case this NM was killed while there were running spark applications, we need to restore - // lost state for the existing executors. We look for an existing file in the NM's local dirs. - // If we don't find one, then we choose a file to use to save the state next time. Even if - // an application was stopped while the NM was down, we expect yarn to call stopApplication() - // when it comes back - registeredExecutorFile = - findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs")); - - TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); - // If authentication is enabled, set up the shuffle server to use a - // special RPC handler that filters out unauthenticated fetch requests - boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); - try { - blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); - } catch (Exception e) { - logger.error("Failed to initialize external shuffle service", e); - } - - List<TransportServerBootstrap> bootstraps = Lists.newArrayList(); - if (authEnabled) { - secretManager = new ShuffleSecretManager(); - bootstraps.add(new SaslServerBootstrap(transportConf, secretManager)); - } - - int port = conf.getInt( - SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); - TransportContext transportContext = new TransportContext(transportConf, blockHandler); - shuffleServer = transportContext.createServer(port, bootstraps); - // the port should normally be fixed, but for tests its useful to find an open port - port = shuffleServer.getPort(); - boundPort = port; - String authEnabledString = authEnabled ? "enabled" : "not enabled"; - logger.info("Started YARN shuffle service for Spark on port {}. " + - "Authentication is {}. Registered executor file is {}", port, authEnabledString, - registeredExecutorFile); - } - - @Override - public void initializeApplication(ApplicationInitializationContext context) { - String appId = context.getApplicationId().toString(); - try { - ByteBuffer shuffleSecret = context.getApplicationDataForService(); - logger.info("Initializing application {}", appId); - if (isAuthenticationEnabled()) { - secretManager.registerApp(appId, shuffleSecret); - } - } catch (Exception e) { - logger.error("Exception when initializing application {}", appId, e); - } - } - - @Override - public void stopApplication(ApplicationTerminationContext context) { - String appId = context.getApplicationId().toString(); - try { - logger.info("Stopping application {}", appId); - if (isAuthenticationEnabled()) { - secretManager.unregisterApp(appId); - } - blockHandler.applicationRemoved(appId, false /* clean up local dirs */); - } catch (Exception e) { - logger.error("Exception when stopping application {}", appId, e); - } - } - - @Override - public void initializeContainer(ContainerInitializationContext context) { - ContainerId containerId = context.getContainerId(); - logger.info("Initializing container {}", containerId); - } - - @Override - public void stopContainer(ContainerTerminationContext context) { - ContainerId containerId = context.getContainerId(); - logger.info("Stopping container {}", containerId); - } - - private File findRegisteredExecutorFile(String[] localDirs) { - for (String dir: localDirs) { - File f = new File(dir, "registeredExecutors.ldb"); - if (f.exists()) { - return f; - } - } - return new File(localDirs[0], "registeredExecutors.ldb"); - } - - /** - * Close the shuffle server to clean up any associated state. - */ - @Override - protected void serviceStop() { - try { - if (shuffleServer != null) { - shuffleServer.close(); - } - if (blockHandler != null) { - blockHandler.close(); - } - } catch (Exception e) { - logger.error("Exception when stopping service", e); - } - } - - // Not currently used - @Override - public ByteBuffer getMetaData() { - return ByteBuffer.allocate(0); - } -} diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java deleted file mode 100644 index 884861752e..0000000000 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.yarn.util; - -import java.util.NoSuchElementException; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.spark.network.util.ConfigProvider; - -/** Use the Hadoop configuration to obtain config values. */ -public class HadoopConfigProvider extends ConfigProvider { - private final Configuration conf; - - public HadoopConfigProvider(Configuration conf) { - this.conf = conf; - } - - @Override - public String get(String name) { - String value = conf.get(name); - if (value == null) { - throw new NoSuchElementException(name); - } - return value; - } -} |