#!/bin/bash

# To try the pyspark interop PoC, uncomment the following line
# exit 0

# example: /usr/lib64/Revo-7.3/R-3.1.3/lib64/R/library/RevoScaleR/utils/RevoSparkSubmitLauncher \
#/var/RevoShare/hue/RevoSparkFifoToSpark \
#/var/RevoShare/hue/RevoSparkFifoFromSpark

# for debugging
#set -x

if [ ! -z "${MRS_DEBUG_EXECUTOR}" ]; then
    debugExecutor=true
else
    debugExecutor=false
fi

SCRIPT_DIR=$(cd "`dirname $(readlink -nf "$0")`"; pwd -P)
. "${SCRIPT_DIR}/RevoTracing"

trace "$*"

# default arguments
loggingLevel=0
tmpFSWorkDir="None"
serverType="R"

# script arguments
fifoToSpark="$1"
shift
fifoFromSpark="$1"
shift
interpreterPath="$1"
shift
nameNode="$1"
shift
nameNodePort="$1"
shift
user="$1"
shift
sessionPid="$1"
shift
appId="$1"
shift
jobId="$1"
shift
wait="$1"
shift
idleTimeout="$1"
shift
executorCores=$1
shift
executorMem=$1
shift
driverMem=$1
shift
executorOverheadMem=$1
shift
numExecutors=$1
shift
extraSparkConfig="$1"
shift
# special handling for forward compatibility:
# loggingLevel, tempFSWorkDir, serverType  
# checking whether exists before assigning, otherwise use default value
if [ "$#" -ne 0 ]; then  # Introduced for 9.1
    loggingLevel="$1"
    shift 
    tmpFSWorkDir="$1"
    shift
fi
if [ "$#" -ne 0 ]; then  # Introdeced for 9.2 
    serverType="$1"
    shift
fi

if [ "$#" -ne 0 ]; then # Introduced for 9.3
    master="$1"
    if [ $master = "standalone" ]; then
        if [ "$SPARK_HOME" == "" ]; then
            echo "SPARK_HOME not found, and could not find the conf files for spark master address."
            exit 1
        elif [ -d $SPARK_HOME ] && [ -f "$SPARK_HOME/conf/master" ]; then # logit for aztk cluster check master ip
            value=$(<$SPARK_HOME/conf/master)
            master="spark://$value:7077"
        elif [ ! -z "$AZUREML_RUN_ID" ]; then
            value=`hostname --ip-address`
            master="spark://$value:7077"
        else
            echo "file for master ip: $SPARK_HOME/conf/master not found, please specify master address rather than using standalone."
            exit 1
        fi
    fi
    shift
    masterInterpreterPath="$1"
fi


# build Spark shell id, and get shell pid if already running
SHELL_PREFIX="scaleR-spark-won2r0FHOA"
SHELL_ID="${SHELL_PREFIX}-${user}-${sessionPid}-${appId}"
if [ "${wait}" == "FALSE" ] || [ "${wait}" == "false" ]; then
    SHELL_ID="${SHELL_ID}-${jobId}"
fi


SHELL_PID=$(pgrep -f "${SHELL_ID}")

# if there is already a Spark app running, do nothing
if [ ! -z "${SHELL_PID}" ]; then
    trace "spark app already running with PID = ${SHELL_PID}"
    exit 0
fi

# Revo64/Revoscript are guaranteed to correctly load RevoScaleR and thus
# we can ask R where RevoScaleR is located; this is the only interface
# guaranteed to work, you should not make any other assumptions about
# how to access this JAR

MRS_HADOOP_DIR_PATH="$SCRIPT_DIR/../.."

# find spark-submit script
export SPARK_SUBMIT="${MRS_HADOOP_DIR_PATH}/mrs-spark-submit"

if [ ! -x "${SPARK_SUBMIT}" ] ; then
  echo "${SPARK_SUBMIT} does not exist or cannot be executed, aborting" >&2
  exit 1
fi

SPARK_JAR=$(readlink -ne "${SCRIPT_DIR}/../../jar/scaler-spark_2.11-0.1.0.jar")

trace "SPARK_JAR = ${SPARK_JAR}" 

# make sure the JAR is installed
if [ ! -f "${SPARK_JAR}" ]; then
    echo "could not find scaleR-spark JAR ${SPARK_JAR}, aborting" >&2
    exit 1
fi


# create user share dir if not exist
USER_SHARE_DIR=$(dirname "$fifoToSpark")
if [ ! -d "$USER_SHARE_DIR" ]; then
    mkdir -p "$USER_SHARE_DIR"
    chmod 777 "$USER_SHARE_DIR"
fi


# we are starting a new shell so re-create FIFOs
trace "re-creating $fifoToSpark $fifoFromSpark"
rm -f $fifoToSpark ${fifoFromSpark}*


# TODO: replace /tmp with the shareDir or a session-specific dir
SHELL_STARTER=`mktemp /tmp/start-spark-shell.${SHELL_ID}.XXXXXXXX`
chmod +x "${SHELL_STARTER}"

# TODO control debug mode globally and set the flag "-debug" below accordingly
# TODO send process output to a log file
# TODO do not remove ${SHELL_STARTER} in debug mode
# TODO document the meaning memory settings (esp. executor.memoryOverhead)

# This creates a shell script which is run in a separate process through
# "at now".
#
# (1) The script is created by mktemp and its first task is to remove
# itself which cleans the filesystem (although operating system will
# just remove the entry from /tmp/ but keep the contents of the file
# accessible until the script exits).
#
# (2) "--master yarn-client" means that the client application will not
# be handled by YARN and if it crashes the related Spark job should be
# closed by YARN
#
cat > "${SHELL_STARTER}" <<EOF
#!/bin/bash
    # remove this script at the very beginning to avoid leaving trash behind
    rm "${SHELL_STARTER}"
    SPARK_SUBMIT_TMP_LOG=\`mktemp /tmp/spark-submit-log.XXXXXXXX\`
    "${SPARK_SUBMIT}" \\
        --class com.microsoft.scaler.spark.api.SparkApp \\
        --master ${master} \\
        --deploy-mode client \\
        --executor-cores ${executorCores} \\
        --executor-memory ${executorMem} \\
        --driver-memory ${driverMem} \\
        --conf spark.yarn.executor.memoryOverhead=${executorOverheadMem} \\
        --num-executors ${numExecutors} \\
        ${extraSparkConfig} \\
        "${SPARK_JAR}" \\
        --sparkShellId   "${SHELL_ID}" \\
        --fifoFromSpark  \$1 \\
        --fifoToSpark    \$2 \\
        --interpreterPath          "${interpreterPath}" \\
        --masterInterpreterPath    "${masterInterpreterPath}" \\
        --hadoopDirPath  "${MRS_HADOOP_DIR_PATH}" \\
        --nameNode       "${nameNode}" \\
        --nameNodePort   "${nameNodePort}" \\
        --idleTimeout    "${idleTimeout}" \\
        --debug          "${debugExecutor}" \\
        --user           "${user}" \\
        --loggingLevel   "${loggingLevel}" \\
        --tmpFSWorkDir   "${tmpFSWorkDir}" \\
        --serverType     "${serverType}" \\
        2>&1 | tee \${SPARK_SUBMIT_TMP_LOG} 
    if [ \${PIPESTATUS[0]} -ne 0 ]; then
        ERROR=\$(tail -n 20 \${SPARK_SUBMIT_TMP_LOG} 2>&1)
        echo -e "ERROR: Fail to execute spark-submit. Last 20 lines' log:\n\${ERROR}" > \$1 
    fi
    rm \${SPARK_SUBMIT_TMP_LOG}
EOF

trace "starting Spark shell in Screen via ${SHELL_STARTER} with id = ${SHELL_ID}"


# We use parentheses around the command with "&" instead of just "&" to
# avoid messing up stdout in the user's R session; enclosing a command
# with parentheses is called "command grouping" and it starts a new shell
# to evaluate the group of commands; see section 3.2.4.3 in
# https://www.gnu.org/software/bash/manual/bash.html#Lists
#
if [ ! -z "${MRS_DEBUG}" ]; then
  ("${SHELL_STARTER}" "${fifoFromSpark}" "${fifoToSpark}" > "/tmp/spark-log/RevoSparkSubmit.$$.log" 2>&1 &)
else
  ("${SHELL_STARTER}" "${fifoFromSpark}" "${fifoToSpark}" > /dev/null 2>&1 &)
fi

# Wait for spark to return "INITIALIZED" signal to indicate that spark launched correctly
trace "wait fifo: ${fifoFromSpark}"
TIMEOUT=$((SECONDS + 300))
until [ -f "${fifoFromSpark}" ] || [ ${SECONDS} -gt ${TIMEOUT} ]; do
    sleep 0.05
done

if [ -f "${fifoFromSpark}" ]; then
    trace "read fifo: ${fifoFromSpark}"
    SPARK_RESPONSE=$(cat ${fifoFromSpark})
    rm -f "${fifoFromSpark}"
fi

if [ -z "${SPARK_RESPONSE}" ]; then
    trace "ERROR: Failed to start a Spark application, perhaps because the cluster is busy with other jobs. Please check YARN usage and try again."
    echo "ERROR: Failed to start a Spark application, perhaps because the cluster is busy with other jobs. Please check YARN usage and try again."
    "${SCRIPT_DIR}/RevoSparkSubmitKiller" "${user}" "${sessionPid}" "${appId}" \
                                          "${jobId}" "${wait}" "kill" "app"
    exit 1
fi

if [[ "${SPARK_RESPONSE}" =~ "ERROR" ]]; then
    trace "response from Spark: ${SPARK_RESPONSE}"
    while read -r line; do
        echo "$line" 1>&2
    done <<<"${SPARK_RESPONSE}"
    exit 1
fi # else will get "INITIALIZED" from spark
