-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathspark-start
executable file
·270 lines (237 loc) · 12.8 KB
/
spark-start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#!/bin/bash
################################################################################
# Test for Slurm environment variables
################################################################################
if [[ -z "${SLURM_JOB_ID}" || -z "${SLURM_CPUS_PER_TASK}" || -z "${SLURM_MEM_PER_NODE}" || -z "${SLURM_JOB_NUM_NODES}" ]]; then
echo "ERROR: Some expected slurm environment variables are missing."
echo "Note this script should only be called from a slurm script. Perhaps you are trying to run the spark-start script outside of a slurm job."
echo ""
echo "Env var values:"
echo " SLURM_JOB_ID=${SLURM_JOB_ID}"
echo " SLURM_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK}"
echo " SLURM_MEM_PER_NODE=${SLURM_MEM_PER_NODE}"
echo " SLURM_JOB_NUM_NODES=${SLURM_JOB_NUM_NODES}"
exit 1
fi
################################################################################
# Set up and launch standalone Spark cluster
################################################################################
# Check that both Spark and Python3 are available.
if ! command -v spark-submit >/dev/null 2>&1 || ! command -v python3 >/dev/null 2>&1; then
echo "ERROR: Cannot start Spark cluster because Spark or Python3 are not found."
echo "Tip: Load spark and python before calling spark-start script. For example:"
echo ""
echo "$ module load spark/3.5.1 python/3.12"
echo "$ spark-start"
echo ""
echo "As a workaround, spark/3.2.1 and python/3.10.4 will be autoloaded."
echo "This may create unexpected behavior that can be avoided by loading the"
echo "spark and python modules of your choice before calling spark-start."
module load spark/3.2.1 python/3.10.4 pyarrow/8.0.0
fi
# Set up standalone cluster daemon properties. The spark master and spark worker
# processes will run with the resources defined here.
export SPARK_DAEMON_CORES=1
export SPARK_DAEMON_MEMORY=1
# Define static driver and executor resources here.
export SPARK_DRIVER_CORES=2
export SPARK_DRIVER_MEMORY=5
#export SPARK_EXECUTOR_CORES=4
#export SPARK_EXECUTOR_MEMORY=26
# Reserve cpu cores and memory resources on the slurm compute node that runs the
# spark driver. The total capacity of the spark cluster available for assignment
# to executors will be reduced by this overhead.
export SPARK_OVERHEAD_CORES=${SPARK_DRIVER_CORES}
export SPARK_OVERHEAD_MEMORY=${SPARK_DRIVER_MEMORY}
# Set up local scratch directories on all the nodes with
# strict permissions for local settings.
# TODO: Can this tmp dir be combined with XDG_RUNTIME_DIR above?
export SCRATCH="$(mktemp -d ${USER}.XXXXXXXXXX -p /tmp)"
echo "Writing spark tmp files to slurm compute nodes: ${SCRATCH}"
srun --label --export=ALL mkdir -p -m 700 \
"${SCRATCH}/local" \
"${SCRATCH}/tmp" \
"${SCRATCH}/work" \
|| fail "Could not set up node local directories"
# Create spark directory structure in ood job directory for global settings.
export OOD_JOB_HOME="${HOME}/.spark-local/${SLURM_JOB_ID}"
echo "Writing spark job files to user's HOME: ${HOME}/.spark-local/${SLURM_JOB_ID}/spark"
mkdir -p "${OOD_JOB_HOME}/spark/conf" \
"${OOD_JOB_HOME}/spark/pid" \
"${OOD_JOB_HOME}/spark/logs" \
|| fail "Could not set up spark directories"
export SPARK_CONF_DIR="${OOD_JOB_HOME}/spark/conf" # global auth secret, env vars
export SPARK_LOG_DIR="${OOD_JOB_HOME}/spark/logs" # master and worker logs
export SPARK_WORKER_DIR="${OOD_JOB_HOME}/spark/logs" # executor logs
export SPARK_PID_DIR="${OOD_JOB_HOME}/spark/pid" # master process pid
export SPARK_LOCAL_DIRS="${SCRATCH}/local" # driver and executor scratch space
# Set up worker node properties. The capacity of each worker node in the spark
# cluster is defined here. The resources defined in the slurm job resources are
# to compute the resources available to the spark worker nodes.
SPARK_WORKER_CORES=${SLURM_CPUS_PER_TASK}
SPARK_WORKER_MEMORY=$(( SLURM_MEM_PER_NODE / 1024 ))
# Generate shared secret for spark and WebUI authentication.
SPARK_SECRET=$(openssl rand -base64 32)
SPARK_WEBUI_SECRET=$(openssl rand -hex 32)
# Generate shared secret for spark and WebUI authentication.
SPARK_SECRET=$(openssl rand -base64 32)
SPARK_WEBUI_SECRET=$(openssl rand -hex 32)
# Create config files with strict permissions.
touch ${SPARK_CONF_DIR}/spark-defaults.conf ${SPARK_CONF_DIR}/spark-env.sh
chmod 700 ${SPARK_CONF_DIR}/spark-defaults.conf ${SPARK_CONF_DIR}/spark-env.sh
# Create spark-defaults config file.
cat > "${SPARK_CONF_DIR}/spark-defaults.conf" <<EOF
# Enable RPC Authentication
spark.authenticate true
spark.authenticate.secret ${SPARK_SECRET}
# Disable job killing from Spark UI. By default, the web UI
# is not protected by authentication which means any user
# with network access could terminate jobs if this was enabled.
spark.ui.killEnabled false
# Add Basic Authentication to Spark Cluster Web UI
spark.ui.filters edu.umich.arc.jonpot.BasicAuthenticationFilter
spark.edu.umich.arc.jonpot.BasicAuthenticationFilter.params username=admin,password=${SPARK_WEBUI_SECRET}
# Enable Spark application web UI (does not affect Spark cluster web UI).
spark.ui.enabled true
# Disable progress bar in console.
spark.ui.showConsoleProgress false
# Use proxy server for downloading additional, user-specified jars.
spark.driver.extraJavaOptions -Dhttp.proxyHost=proxy1.arc-ts.umich.edu -Dhttp.proxyPort=3128 -Dhttps.proxyHost=proxy1.arc-ts.umich.edu -Dhttps.proxyPort=3128
spark.executor.extraJavaOptions -Dhttp.proxyHost=proxy1.arc-ts.umich.edu -Dhttp.proxyPort=3128 -Dhttps.proxyHost=proxy1.arc-ts.umich.edu -Dhttps.proxyPort=3128
EOF
# Create spark-env config file.
cat > "${SPARK_CONF_DIR}/spark-env.sh" <<EOF
export SPARK_WORKER_DIR=${SPARK_WORKER_DIR}
export SPARK_LOCAL_DIRS=${SPARK_LOCAL_DIRS}
export SPARK_DAEMON_MEMORY=${SPARK_DAEMON_MEMORY}g
export SPARK_LOG_DIR=${SPARK_LOG_DIR}
export SPARK_PID_DIR=${SPARK_PID_DIR}
export SPARK_CONF_DIR=${SPARK_CONF_DIR}
export SCRATCH=${SCRATCH}
EOF
# A worker process runs on each available slurm node, but the spark driver
# runs on only one slurm node. The worker process that runs on the same node
# with the driver, will need to use less resources. Calculate the remaining
# resources for the spark worker that runs on the same node as the spark driver.
if [ ${SPARK_WORKER_CORES} -ge ${SPARK_OVERHEAD_CORES} ] && [ ${SPARK_WORKER_MEMORY} -ge ${SPARK_OVERHEAD_MEMORY} ]; then
SPARK_WORKER_CORES_REMAINING=$(( SPARK_WORKER_CORES - SPARK_OVERHEAD_CORES ))
SPARK_WORKER_MEMORY_REMAINING=$(( SPARK_WORKER_MEMORY - SPARK_OVERHEAD_MEMORY ))
else
echo "Error: The slurm node running the spark driver does not have enough resources. The spark cluster did not start."
exit 1
fi
# ----
# Apr 28, 2022 - Removing much of this section for now. It was intented to prevent
# resource wasting by giving warning messages when cpu to memory ratio of
# slurm job and spark executor configurations were mismatched. Despite this
# intention, the code is overly complex and makes assumptions about the
# users executor configurations that cannot be known before the script is
# run. Keeping the code for now because it may be useful with OOD app. I will
# keep the calculation of the total cluster capacity and write it to
# spark-env.sh so that the OOD app will know how many executors it can run.
# ----
SPARK_CLUSTER_CORES=$(( SPARK_WORKER_CORES_REMAINING + (SLURM_JOB_NUM_NODES - 1) * SPARK_WORKER_CORES ))
SPARK_CLUSTER_MEMORY=$(( SPARK_WORKER_MEMORY_REMAINING + (SLURM_JOB_NUM_NODES - 1) * SPARK_WORKER_MEMORY ))
echo "Spark cluster total capacity available for executors:"
echo " - ${SPARK_CLUSTER_CORES} cores"
echo " - ${SPARK_CLUSTER_MEMORY}G memory"
echo "export SPARK_CLUSTER_CORES=${SPARK_CLUSTER_CORES}" >> ${SPARK_CONF_DIR}/spark-env.sh
echo "export SPARK_CLUSTER_MEMORY=${SPARK_CLUSTER_MEMORY}" >> ${SPARK_CONF_DIR}/spark-env.sh
# Calculate the total spark cluster capacity. Then compute the maximum number
# of executors that the Spark cluster can support by cores and by memory. The
# smaller of these will be the final number of executors that will be run. Give
# warnings if compute resources are being wasted. Also warn if cluster cannot
# run 3 or more executors.
#SPARK_CLUSTER_CORES=$(( SPARK_WORKER_CORES_REMAINING + (SLURM_JOB_NUM_NODES - 1) * SPARK_WORKER_CORES ))
#SPARK_CLUSTER_MEMORY=$(( SPARK_WORKER_MEMORY_REMAINING + (SLURM_JOB_NUM_NODES - 1) * SPARK_WORKER_MEMORY ))
#SPARK_MAX_NUM_EXECUTOR_BY_CORES=$(( SPARK_CLUSTER_CORES / SPARK_EXECUTOR_CORES ))
#SPARK_MAX_NUM_EXECUTOR_BY_MEMORY=$(( SPARK_CLUSTER_MEMORY / SPARK_EXECUTOR_MEMORY ))
#if [ ${SPARK_MAX_NUM_EXECUTOR_BY_CORES} -ne ${SPARK_MAX_NUM_EXECUTOR_BY_MEMORY} ]; then
# echo "Warning: There is a resource mismatch."
# echo "Executor configuration:"
# echo " - ${SPARK_EXECUTOR_CORES} cores"
# echo " - ${SPARK_EXECUTOR_MEMORY}G memory"
# echo "Spark cluster total capacity for executors:"
# echo " - ${SPARK_CLUSTER_CORES} cores"
# echo " - ${SPARK_CLUSTER_MEMORY}G memory"
# echo "Spark cluster has capacity to run the lesser of ${SPARK_MAX_NUM_EXECUTOR_BY_CORES} or ${SPARK_MAX_NUM_EXECUTOR_BY_MEMORY} executors."
# echo "Consider adjusting the spark cluster or executor configuration to avoid wasting resources."
# if [ ${SPARK_MAX_NUM_EXECUTOR_BY_MEMORY} -lt ${SPARK_MAX_NUM_EXECUTOR_BY_CORES} ]; then
# SPARK_MAX_NUM_EXECUTOR=${SPARK_MAX_NUM_EXECUTOR_BY_MEMORY}
# else
# SPARK_MAX_NUM_EXECUTOR=${SPARK_MAX_NUM_EXECUTOR_BY_CORES}
# fi
#else
# SPARK_MAX_NUM_EXECUTOR=${SPARK_MAX_NUM_EXECUTOR_BY_CORES}
#fi
#if [ ${SPARK_MAX_NUM_EXECUTOR} -lt 3 ]; then
# echo "Warning: The spark cluster does not have enough resources to run 3 or more executors."
#fi
#SPARK_TOTAL_EXECUTOR_CORES=$(( SPARK_MAX_NUM_EXECUTOR * SPARK_EXECUTOR_CORES ))
# Start the master process and get its log file.
START_OUTPUT=$(${SPARK_HOME}/sbin/start-master.sh)
if [ $? -ne 0 ]; then
echo "Error: Spark master did not start."
echo "Output of the start script was: ${START_OUTPUT}"
exit 1
fi
echo "Spark master start script finished."
echo "Output of the start script was: ${START_OUTPUT}"
SPARK_MASTER_LOG=$(ls -1 ${SPARK_LOG_DIR}/*master*.out | head -n 1)
echo "SPARK_MASTER_LOG: ${SPARK_MASTER_LOG}"
# Wait here until master starts and logs its URL. There is sometimes a
# write delay with network mounted filesystems.
LOOP_COUNT=0
while ! grep -q "started at http://" ${SPARK_MASTER_LOG}; do
echo -n ".."
sleep 2
if [ ${LOOP_COUNT} -gt 300 ]; then
echo "Error: Spark Master did not start. Web UI address not found in master log."
exit 1
fi
LOOP_COUNT=$(( LOOP_COUNT + 1 ))
done
LOOP_COUNT=0
while ! grep -q "spark://" ${SPARK_MASTER_LOG}; do
echo -n ".."
sleep 2
if [ ${LOOP_COUNT} -gt 300 ]; then
echo "Error: Spark Master did not start. Spark URL not found in master log."
exit 1
fi
LOOP_COUNT=$(( LOOP_COUNT + 1 ))
done
# Get URL and ports from log file and add to spark-env.sh.
# TODO: What if spark log format changes? Will https on master UI break pattern matching below?
SPARK_MASTER_URL=$(grep -m 1 "spark://" ${SPARK_MASTER_LOG} | sed "s/^.*spark:\/\//spark:\/\//")
SPARK_MASTER_WEBUI=$(grep -i -m 1 "started at http://" ${SPARK_MASTER_LOG} | sed "s/^.*http:\/\//http:\/\//")
echo "export SPARK_MASTER_URL=${SPARK_MASTER_URL}" >> ${SPARK_CONF_DIR}/spark-env.sh
echo "export SPARK_MASTER_WEBUI=${SPARK_MASTER_WEBUI}" >> ${SPARK_CONF_DIR}/spark-env.sh
# Log these values for troubleshooting.
echo "SPARK_MASTER_URL: ${SPARK_MASTER_URL}"
echo "SPARK_MASTER_WEBUI: ${SPARK_MASTER_WEBUI}"
# Create a worker starter script for non-daemonized spark workers.
cat > ${SPARK_CONF_DIR}/sparkworker.sh <<EOF
#!/bin/bash
ulimit -u 16384 -n 16384
export SPARK_CONF_DIR=${SPARK_CONF_DIR}
export SPARK_WORKER_CORES=${SPARK_WORKER_CORES}
export SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY}g
logf="${SPARK_LOG_DIR}/spark-worker-\$(hostname).out"
exec spark-class org.apache.spark.deploy.worker.Worker "${SPARK_MASTER_URL}" &> "\${logf}"
EOF
# Broadcast the worker script to all slurm nodes.
chmod +x ${SPARK_CONF_DIR}/sparkworker.sh
srun /usr/bin/cp ${SPARK_CONF_DIR}/sparkworker.sh "${SCRATCH}/sparkworker.sh" \
|| fail "Could not broadcast worker start script to nodes"
rm -f ${SPARK_CONF_DIR}/sparkworker.sh
# Modify the worker script on the node that will run the spark driver. Reduce
# the resources requested by the worker to leave resources for the driver.
sed -i "s/SPARK_WORKER_CORES=${SPARK_WORKER_CORES}/SPARK_WORKER_CORES=${SPARK_WORKER_CORES_REMAINING}/;
s/SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY}g/SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY_REMAINING}g/" \
"${SCRATCH}/sparkworker.sh"
# Start the worker nodes.
srun --label --export=ALL --wait=0 --cpus-per-task=${SLURM_CPUS_PER_TASK} "${SCRATCH}/sparkworker.sh" &
WORKERS_PID=$!
echo "WORKERS_PID=${WORKERS_PID}"
echo "export WORKERS_PID=${WORKERS_PID}" >> ${SPARK_CONF_DIR}/spark-env.sh