aboutsummaryrefslogtreecommitdiff
path: root/network/yarn
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-02-28 17:25:07 -0800
committerReynold Xin <rxin@databricks.com>2016-02-28 17:25:07 -0800
commit9e01dcc6446f8648e61062f8afe62589b9d4b5ab (patch)
treeae3c7015e950de315a490ce58c181671bfd12907 /network/yarn
parentcca79fad66c4315b0ed6de59fd87700a540e6646 (diff)
downloadspark-9e01dcc6446f8648e61062f8afe62589b9d4b5ab.tar.gz
spark-9e01dcc6446f8648e61062f8afe62589b9d4b5ab.tar.bz2
spark-9e01dcc6446f8648e61062f8afe62589b9d4b5ab.zip
[SPARK-13529][BUILD] Move network/* modules into common/network-*
## What changes were proposed in this pull request? As the title says, this moves the three modules currently in network/ into common/network-*. This removes one top level, non-user-facing folder. ## How was this patch tested? Compilation and existing tests. We should run both SBT and Maven. Author: Reynold Xin <rxin@databricks.com> Closes #11409 from rxin/SPARK-13529.
Diffstat (limited to 'network/yarn')
-rw-r--r--network/yarn/pom.xml148
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java224
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java42
3 files changed, 0 insertions, 414 deletions
diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml
deleted file mode 100644
index 3cb44324f2..0000000000
--- a/network/yarn/pom.xml
+++ /dev/null
@@ -1,148 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-network-yarn_2.11</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project YARN Shuffle Service</name>
- <url>http://spark.apache.org/</url>
- <properties>
- <sbt.project.name>network-yarn</sbt.project.name>
- <!-- Make sure all Hadoop dependencies are provided to avoid repackaging. -->
- <hadoop.deps.scope>provided</hadoop.deps.scope>
- <shuffle.jar>${project.build.directory}/scala-${scala.binary.version}/spark-${project.version}-yarn-shuffle.jar</shuffle.jar>
- <shade>org/spark-project/</shade>
- </properties>
-
- <dependencies>
- <!-- Core dependencies -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
-
- <!-- Provided dependencies -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${shuffle.jar}</outputFile>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <relocations>
- <relocation>
- <pattern>com.fasterxml.jackson</pattern>
- <shadedPattern>org.spark-project.com.fasterxml.jackson</shadedPattern>
- <includes>
- <include>com.fasterxml.jackson.**</include>
- </includes>
- </relocation>
- </relocations>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- probes to validate that those dependencies which must be shaded are -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <phase>verify</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <target>
- <macrodef name="shaded">
- <attribute name="resource"/>
- <sequential>
- <fail message="Not found ${shade}@{resource}">
- <condition>
- <not>
- <resourceexists>
- <zipentry zipfile="${shuffle.jar}" name="${shade}@{resource}"/>
- </resourceexists>
- </not>
- </condition>
- </fail>
- </sequential>
- </macrodef>
- <echo>Verifying dependency shading</echo>
- <shaded resource="com/fasterxml/jackson/core/JsonParser.class" />
- <shaded resource="com/fasterxml/jackson/annotation/JacksonAnnotation.class" />
- <shaded resource="com/fasterxml/jackson/databind/JsonSerializer.class" />
- </target>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
deleted file mode 100644
index ba6d30a74c..0000000000
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.yarn;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.sasl.SaslServerBootstrap;
-import org.apache.spark.network.sasl.ShuffleSecretManager;
-import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.server.TransportServerBootstrap;
-import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
-import org.apache.spark.network.util.TransportConf;
-import org.apache.spark.network.yarn.util.HadoopConfigProvider;
-
-/**
- * An external shuffle service used by Spark on Yarn.
- *
- * This is intended to be a long-running auxiliary service that runs in the NodeManager process.
- * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
- * The application also automatically derives the service port through `spark.shuffle.service.port`
- * specified in the Yarn configuration. This is so that both the clients and the server agree on
- * the same port to communicate on.
- *
- * The service also optionally supports authentication. This ensures that executors from one
- * application cannot read the shuffle files written by those from another. This feature can be
- * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
- * Note that the Spark application must also set `spark.authenticate` manually and, unlike in
- * the case of the service port, will not inherit this setting from the Yarn configuration. This
- * is because an application running on the same Yarn cluster may choose to not use the external
- * shuffle service, in which case its setting of `spark.authenticate` should be independent of
- * the service's.
- */
-public class YarnShuffleService extends AuxiliaryService {
- private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
-
- // Port on which the shuffle server listens for fetch requests
- private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
- private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
-
- // Whether the shuffle server should authenticate fetch requests
- private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
- private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
-
- // An entity that manages the shuffle secret per application
- // This is used only if authentication is enabled
- private ShuffleSecretManager secretManager;
-
- // The actual server that serves shuffle files
- private TransportServer shuffleServer = null;
-
- // Handles registering executors and opening shuffle blocks
- @VisibleForTesting
- ExternalShuffleBlockHandler blockHandler;
-
- // Where to store & reload executor info for recovering state after an NM restart
- @VisibleForTesting
- File registeredExecutorFile;
-
- // just for testing when you want to find an open port
- @VisibleForTesting
- static int boundPort = -1;
-
- // just for integration tests that want to look at this file -- in general not sensible as
- // a static
- @VisibleForTesting
- static YarnShuffleService instance;
-
- public YarnShuffleService() {
- super("spark_shuffle");
- logger.info("Initializing YARN shuffle service for Spark");
- instance = this;
- }
-
- /**
- * Return whether authentication is enabled as specified by the configuration.
- * If so, fetch requests will fail unless the appropriate authentication secret
- * for the application is provided.
- */
- private boolean isAuthenticationEnabled() {
- return secretManager != null;
- }
-
- /**
- * Start the shuffle server with the given configuration.
- */
- @Override
- protected void serviceInit(Configuration conf) {
-
- // In case this NM was killed while there were running spark applications, we need to restore
- // lost state for the existing executors. We look for an existing file in the NM's local dirs.
- // If we don't find one, then we choose a file to use to save the state next time. Even if
- // an application was stopped while the NM was down, we expect yarn to call stopApplication()
- // when it comes back
- registeredExecutorFile =
- findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
-
- TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
- // If authentication is enabled, set up the shuffle server to use a
- // special RPC handler that filters out unauthenticated fetch requests
- boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
- try {
- blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
- } catch (Exception e) {
- logger.error("Failed to initialize external shuffle service", e);
- }
-
- List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
- if (authEnabled) {
- secretManager = new ShuffleSecretManager();
- bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
- }
-
- int port = conf.getInt(
- SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
- TransportContext transportContext = new TransportContext(transportConf, blockHandler);
- shuffleServer = transportContext.createServer(port, bootstraps);
- // the port should normally be fixed, but for tests its useful to find an open port
- port = shuffleServer.getPort();
- boundPort = port;
- String authEnabledString = authEnabled ? "enabled" : "not enabled";
- logger.info("Started YARN shuffle service for Spark on port {}. " +
- "Authentication is {}. Registered executor file is {}", port, authEnabledString,
- registeredExecutorFile);
- }
-
- @Override
- public void initializeApplication(ApplicationInitializationContext context) {
- String appId = context.getApplicationId().toString();
- try {
- ByteBuffer shuffleSecret = context.getApplicationDataForService();
- logger.info("Initializing application {}", appId);
- if (isAuthenticationEnabled()) {
- secretManager.registerApp(appId, shuffleSecret);
- }
- } catch (Exception e) {
- logger.error("Exception when initializing application {}", appId, e);
- }
- }
-
- @Override
- public void stopApplication(ApplicationTerminationContext context) {
- String appId = context.getApplicationId().toString();
- try {
- logger.info("Stopping application {}", appId);
- if (isAuthenticationEnabled()) {
- secretManager.unregisterApp(appId);
- }
- blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
- } catch (Exception e) {
- logger.error("Exception when stopping application {}", appId, e);
- }
- }
-
- @Override
- public void initializeContainer(ContainerInitializationContext context) {
- ContainerId containerId = context.getContainerId();
- logger.info("Initializing container {}", containerId);
- }
-
- @Override
- public void stopContainer(ContainerTerminationContext context) {
- ContainerId containerId = context.getContainerId();
- logger.info("Stopping container {}", containerId);
- }
-
- private File findRegisteredExecutorFile(String[] localDirs) {
- for (String dir: localDirs) {
- File f = new File(dir, "registeredExecutors.ldb");
- if (f.exists()) {
- return f;
- }
- }
- return new File(localDirs[0], "registeredExecutors.ldb");
- }
-
- /**
- * Close the shuffle server to clean up any associated state.
- */
- @Override
- protected void serviceStop() {
- try {
- if (shuffleServer != null) {
- shuffleServer.close();
- }
- if (blockHandler != null) {
- blockHandler.close();
- }
- } catch (Exception e) {
- logger.error("Exception when stopping service", e);
- }
- }
-
- // Not currently used
- @Override
- public ByteBuffer getMetaData() {
- return ByteBuffer.allocate(0);
- }
-}
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
deleted file mode 100644
index 884861752e..0000000000
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.yarn.util;
-
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.spark.network.util.ConfigProvider;
-
-/** Use the Hadoop configuration to obtain config values. */
-public class HadoopConfigProvider extends ConfigProvider {
- private final Configuration conf;
-
- public HadoopConfigProvider(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public String get(String name) {
- String value = conf.get(name);
- if (value == null) {
- throw new NoSuchElementException(name);
- }
- return value;
- }
-}