#!/bin/bash

# example: /usr/lib64/Revo-7.3/R-3.1.3/lib64/R/library/RevoScaleR/utils/RevoSparkSubmitLauncher \
#/var/RevoShare/hue/RevoSparkFifoToSpark \
#/var/RevoShare/hue/RevoSparkFifoFromSpark

# for debugging
#set -x

#TODO: this script is deprecated since MRS 9.2.0. Clean it when MRS 9.1.0 is out of support.

SCRIPT_DIR=$(cd "`dirname $(readlink -nf "$0")`"; pwd -P)
. "${SCRIPT_DIR}/RevoTracing"

trace "$*"

# default arguments
loggingLevel=0
tmpFSWorkDir="/dev/shm"
serverType="R"

# script arguments
fifoToSpark="$1"
shift
fifoFromSpark="$1"
shift
interpreterPath="$1"
shift
nameNode="$1"
shift
nameNodePort="$1"
shift
user="$1"
shift
sessionPid="$1"
shift
appId="$1"
shift
jobId="$1"
shift
wait="$1"
shift
idleTimeout="$1"
shift
executorCores=$1
shift
executorMem=$1
shift
driverMem=$1
shift
executorOverheadMem=$1
shift
numExecutors=$1
shift
extraSparkConfig="$1"
shift
# special handling for forward compatibility:
# loggingLevel, tempFSWorkDir, serverType  
# checking whether exists before assigning, otherwise use default value
if [ "$#" -ne 0 ]; then  # Introduced for 9.1
    loggingLevel="$1"
    shift 
    tmpFSWorkDir="$1"
    shift
fi
if [ "$#" -ne 0 ]; then  # Introdeced for 9.2 
    serverType="$1"
fi




# build Spark shell id, and get shell pid if already running
SHELL_PREFIX="scaleR-spark-won2r0FHOA"
SHELL_ID="${SHELL_PREFIX}-${user}-${sessionPid}-${appId}"
if [ "${wait}" == "FALSE" ] || [ "${wait}" == "false" ]; then
    SHELL_ID="${SHELL_ID}-${jobId}"
fi


SHELL_PID=$(pgrep -f "${SHELL_ID}")

# if there is already a Spark app running, do nothing
if [ ! -z "${SHELL_PID}" ]; then
    trace "spark app already running with PID = ${SHELL_PID}"
    exit 0
fi

function extract_std
{
    msg=$@
    is_stdout=false
    is_stderr=false
    stdout_msg=""
    stderr_msg=""
    for line in $msg
    do
        if [[ $line == *"%PROTOCOL-STDOUT%"* ]]; then
            is_stdout=true
            is_stderr=false
            continue
        fi
        if [[ $line == *"%PROTOCOL-STDERR%"* ]]; then
            is_stdout=false
            is_stderr=true
            continue
        fi
        if [[ $line == *"%PROTOCOL-END%"* ]]; then
            is_stdout=false
            is_stderr=false
            continue
        fi
        if [[ $is_stdout = true ]]; then
            if [[ $stdout_msg == "" ]]; then
                stdout_msg=${line}
            else
                stdout_msg="${stdout_msg}\n${line}"
            fi
        fi
        if [[ $is_stderr = true ]]; then
            if [[ $stderr_msg == "" ]]; then
                stderr_msg=${line}
            else
                stderr_msg="${stderr_msg}\n${line}"
            fi
        fi
    done
}

if [ ! -f /usr/bin/mrs-hadoop-job ]; then 
  echo "Failed to find /usr/bin/mrs-hadoop-job command, please check whether Microsoft Machine Learning Server is installed correctly." >&2
  exit 1
fi

msg=$(/usr/bin/mrs-hadoop-job path)
exit_code=$?
extract_std $msg
if [ $exit_code -ne 0 ]; then
    if [ $exit_code -eq 1 ]; then
        echo -e "Warning from backend api /usr/bin/mrs-hadoop-job path: $stderr_msg"
    else
        echo -e "Error while running backend api /usr/bin/mrs-hadoop-job path: $stderr_msg" >&2
        exit 1
    fi
fi
if [ $stdout_msg == "" ]; then
    echo -e "Fail to get MRS_HADOOP_DIR from /usr/bin/mrs-hadoop-job path, aborting" >&2
    exit 1
fi

MRS_HADOOP_DIR_PATH="$(dirname $stdout_msg)/../.."

# find spark-submit script
export SPARK_SUBMIT=/usr/bin/mrs-spark-submit

if [ ! -x "${SPARK_SUBMIT}" ] ; then
  echo "${SPARK_SUBMIT} does not exist or cannot be executed, aborting" >&2
  exit 1
fi


# determine Spark version
SPARK_VERSION=$("${SPARK_SUBMIT}" --version 2>&1 | grep "version[[:space:]][[:digit:]]" | sed "s/^.*version[[:space:]]*//")

#
trace "SPARK_SUBMIT = ${SPARK_SUBMIT}, version = ${SPARK_VERSION}"

# decide which jar file to use
if [[ "${SPARK_VERSION}" < "2.0.0" ]]; then
    SPARK_JAR=$(readlink -ne "${MRS_HADOOP_DIR_PATH}/jar/scaler-spark_2.10-0.1.0.jar")
else 
    SPARK_JAR=$(readlink -ne "${MRS_HADOOP_DIR_PATH}/jar/scaler-spark_2.11-0.1.0.jar")
fi

trace "SPARK_JAR = ${SPARK_JAR}" 

# make sure the JAR is installed
if [ ! -f "${SPARK_JAR}" ]; then
    echo "could not find scaleR-spark JAR ${SPARK_JAR}, aborting" >&2
    exit 1
fi


# create user share dir if not exist
USER_SHARE_DIR=$(dirname "$fifoToSpark")
if [ ! -d "$USER_SHARE_DIR" ]; then
    mkdir -p "$USER_SHARE_DIR"
    chmod 777 "$USER_SHARE_DIR"
fi


# we are starting a new shell so re-create FIFOs
trace "re-creating $fifoToSpark $fifoFromSpark"
rm -f $fifoToSpark ${fifoFromSpark}*
mkfifo "$fifoToSpark"
mkfifo "$fifoFromSpark"


# TODO: replace /tmp with the shareDir or a session-specific dir
SHELL_STARTER=`mktemp /tmp/start-spark-shell.${SHELL_ID}.XXXXXXXX`
chmod +x "${SHELL_STARTER}"

# TODO control debug mode globally and set the flag "-debug" below accordingly
# TODO send process output to a log file
# TODO do not remove ${SHELL_STARTER} in debug mode
# TODO document the meaning memory settings (esp. executor.memoryOverhead)

# spark submit command-line syntax has been changed in 1.6.0
if [[ "${SPARK_VERSION}" < "1.6.0" ]]; then
  # https://spark.apache.org/docs/1.5.2/running-on-yarn.html
    DEPLOY_MODE="--master yarn-client"
else
    # https://spark.apache.org/docs/1.6.0/running-on-yarn.html
    DEPLOY_MODE="--master yarn --deploy-mode client"
fi


# This creates a shell script which is run in a separate process through
# "at now".
#
# (1) The script is created by mktemp and its first task is to remove
# itself which cleans the filesystem (although operating system will
# just remove the entry from /tmp/ but keep the contents of the file
# accessible until the script exits).
#
# (2) "--master yarn-client" means that the client application will not
# be handled by YARN and if it crashes the related Spark job should be
# closed by YARN
#
cat > "${SHELL_STARTER}" <<EOF
#!/bin/bash
    # remove this script at the very beginning to avoid leaving trash behind
    rm "${SHELL_STARTER}"
    SPARK_SUBMIT_TMP_LOG=\`mktemp /tmp/spark-submit-log.XXXXXXXX\`
    "${SPARK_SUBMIT}" \\
        --class com.microsoft.scaler.spark.RREServer \\
        ${DEPLOY_MODE} \\
        --executor-cores ${executorCores} \\
        --executor-memory ${executorMem} \\
        --driver-memory ${driverMem} \\
        --conf spark.yarn.executor.memoryOverhead=${executorOverheadMem} \\
        --num-executors ${numExecutors} \\
        --conf spark.files.overwrite=true \\
        --conf spark.rpc=akka \\
        ${extraSparkConfig} \\
        "${SPARK_JAR}" \\
        --sparkShellId   "${SHELL_ID}" \\
        --fifoFromSpark  \$1 \\
        --fifoToSpark    \$2 \\
        --interpreterPath          "${interpreterPath}" \\
        --hadoopDirPath  "${MRS_HADOOP_DIR_PATH}" \\
        --nameNode       "${nameNode}" \\
        --nameNodePort   "${nameNodePort}" \\
        --idleTimeout    "${idleTimeout}" \\
        --debug          false \\
        --user           "${user}" \\
        --loggingLevel   "${loggingLevel}" \\
        --tmpFSWorkDir   "${tmpFSWorkDir}" \\
        --serverType     "${serverType}"
        2>&1 | tee \${SPARK_SUBMIT_TMP_LOG}
    if [ \${PIPESTATUS[0]} -ne 0 ]; then
        if [ -e \$1 ]; then
            ERROR=\$(tail -n 20 \${SPARK_SUBMIT_TMP_LOG} 2>&1)
            # Redirect namepipe as file descriptor 4 to avoid blocking. 
            # Cannot simply use & to unblock because it can cause zombie SHELL_STARTER process
            exec 4<> \$1
            echo -e "ERROR: Fail to execute spark-submit. Last 20 lines' log:\n\${ERROR}" >&4
        fi
    fi
    rm \${SPARK_SUBMIT_TMP_LOG}
EOF

trace "starting Spark shell in Screen via ${SHELL_STARTER} with id = ${SHELL_ID}"


# We use parentheses around the command with "&" instead of just "&" to
# avoid messing up stdout in the user's R session; enclosing a command
# with parentheses is called "command grouping" and it starts a new shell
# to evaluate the group of commands; see section 3.2.4.3 in
# https://www.gnu.org/software/bash/manual/bash.html#Lists
#
if [ ! -z "${MRS_DEBUG}" ]; then
  ("${SHELL_STARTER}" "${fifoFromSpark}" "${fifoToSpark}" > "/tmp/spark-log/RevoSparkSubmit.$$.log" 2>&1 &)
else
  ("${SHELL_STARTER}" "${fifoFromSpark}" "${fifoToSpark}" > /dev/null 2>&1 &)
fi

# Wait for spark to return "INITIALIZED" signal to indicate that spark launched correctly
SPARK_RESPONSE=$(timeout 300 cat $fifoFromSpark)
if [ -z "${SPARK_RESPONSE}" ]; then
    trace "ERROR: Failed to start a Spark application, perhaps because the cluster is busy with other jobs. Please check YARN usage and try again."
    echo "ERROR: Failed to start a Spark application, perhaps because the cluster is busy with other jobs. Please check YARN usage and try again."
    "${SCRIPT_DIR}/RevoSparkSubmitKiller" "${user}" "${sessionPid}" "${appId}" \
                                          "${jobId}" "${wait}" "kill" "app"
    exit 1
fi

if [[ "${SPARK_RESPONSE}" =~ "ERROR" ]]; then
    trace "response from Spark: ${SPARK_RESPONSE}"
    while read -r line; do
        echo "$line"
    done <<<"${SPARK_RESPONSE}"
    exit 1
fi # else will get "INITIALIZED" from spark
