#!/bin/bash

SCRIPT_DIR=$(cd "`dirname $(readlink -nf "$0")`"; pwd -P)
. "${SCRIPT_DIR}/RevoTracing"

trace "$*"
# script parameters
user="$1"
sessionPid="$2"
appId="$3"
jobId="$4"
wait="$5"
# |-------------------------------------------------------------|
# |mode  | stop        | clean     | kill                       |
# |------|-------------|-----------|----------------------------|
# |value | fifoToSpark | "session" | "user" / "session" / "app" |
# |-------------------------------------------------------------|
mode="$6"
value="$7"

SHELL_PREFIX="scaleR-spark-won2r0FHOA"

if [ "${mode}" == "stop" ]; then
    # in 'stop' mode, we send stop message to spark to let 
    # spark itself stop and exit

    trace "stop mode"

    # build Spark shell id, and get shell pid if already running
    SHELL_ID="${SHELL_PREFIX}-${user}-${sessionPid}-${appId}"
    if [ "${wait}" == "FALSE" ] || [ "${wait}" == "false" ]; then
        SHELL_ID="${SHELL_ID}-${jobId}"
    fi
    trace "SHELL_ID=${SHELL_ID}"

    # Remove RevoSparkFifoToSpark
    rm -f ${value}
    # Remove RevoSparkFifoFromSpark
    fifofrom=${value/RevoSparkFifoToSpark/RevoSparkFifoFromSpark}
    rm -f ${fifofrom}*

    SHELL_STARTER_PREFIX=start-spark-shell.${SHELL_ID}
    SHELL_STARTER_PID=$(pgrep -f "${SHELL_STARTER_PREFIX}")
    trace "SHELL_STARTER_PID=${SHELL_STARTER_PID}"

    if [ -n "${SHELL_STARTER_PID}" ]; then
        SHELL_PID=$(pgrep -P "${SHELL_STARTER_PID}" -f java)
        trace "SHELL_PID=${SHELL_PID}"

        # if the Spark app is running, send SIGTERM to it
        if [ -n "${SHELL_PID}" ]; then
            # Send SIGTERM signal to spark submit to exit
            kill -TERM ${SHELL_PID}
            # We use parentheses around the command with "&" instead of just "&" to
            # avoid messing up stdout in the user's R/python session; enclosing a command
            # with parentheses is called "command grouping" and it starts a new shell
            # to evaluate the group of commands; see section 3.2.4.3 in
            # https://www.gnu.org/software/bash/manual/bash.html#Lists
            #
            ((sleep 10s; if [ $(ps -o pid= -p ${SHELL_PID} | wc -l) -gt 0 ]; then kill -KILL ${SHELL_PID}; fi) &)
        else
            trace "spark app is not running"
        fi
    else
        trace "spark starter is not running"
    fi
    exit 0
fi

if [ "${mode}" == "clean" ]; then
    # in 'clean' mode, we do nothing if shell is already running,
    # otherwise, we kill all running apps with same R session

    trace "clean mode"

    # build Spark shell id and get shell pid if already running
    SHELL_ID="${SHELL_PREFIX}-${user}-${sessionPid}-${appId}"

    SHELL_STARTER_PREFIX=start-spark-shell.${SHELL_ID}
    SHELL_STARTER_PID=$(pgrep -f "${SHELL_STARTER_PREFIX}")
    trace "SHELL_STARTER_PID=${SHELL_STARTER_PID}"

    # if there is a Spark starter running, do nothing
    if [ ! -z "${SHELL_STARTER_PID}" ]; then
        exit 0
    fi

    # switch to kill (session) mode
    mode="kill"
    value="session"
fi

if [ "${mode}" == "kill" ]; then
    # in 'kill' mode, we choose to kill all running apps with same
    # user or with same R/python session based on value of $value

    trace "kill mode"

    # build shell clean prefix
    SHELL_CLEAN_PREFIX="NA"
    if [ "${value}" == "user" ]; then
        SHELL_CLEAN_PREFIX="${SHELL_PREFIX}-${user}"
    fi
    if [ "${value}" == "session" ]; then
        SHELL_CLEAN_PREFIX="${SHELL_PREFIX}-${user}-${sessionPid}"
    fi
    if [ "${value}" == "app" ]; then
        SHELL_CLEAN_PREFIX="${SHELL_PREFIX}-${user}-${sessionPid}-${appId}"
        if [ "${wait}" == "FALSE" ] || [ "${wait}" == "false" ]; then
            SHELL_CLEAN_PREFIX="${SHELL_CLEAN_PREFIX}-${jobId}"
        fi
    fi

    trace "clean-prefix=${SHELL_CLEAN_PREFIX}"
    if [ "${SHELL_CLEAN_PREFIX}" == "NA" ]; then
        exit 0
    fi

    SHELL_STARTER_CLEAN_PREFIX=start-spark-shell.${SHELL_CLEAN_PREFIX}
    SHELL_STARTER_PID_TO_CLEAN=$(pgrep -f "${SHELL_STARTER_CLEAN_PREFIX}")

    if [ -z "${SHELL_STARTER_PID_TO_CLEAN}" ]; then
        exit 0
    fi

    # kill spark app and remove fifo
    for shellstarterpid in ${SHELL_STARTER_PID_TO_CLEAN}; do
        trace "shell-starter-pid=${shellstarterpid}"

        shellstartercmd=$(ps --no-heading fwwp ${shellstarterpid})
        trace "shell-starter-cmd=${shellstartercmd}"

        shellpid=$(pgrep -P "${shellstarterpid}" -f java)
        trace "shell-pid=${shellpid}"
        if [ ! -z "${shellpid}" ]; then
            trace "kill -9 ${shellpid}"
            kill -9 ${shellpid}
            sleep 0.05
        fi

        fifoto=$(expr match "${shellstartercmd}" '.* \(.*RevoSparkFifoToSpark\S*\).*')
        if [ ! -z "${fifoto}" ]; then
            trace "rm -f $fifoto"
            rm -f ${fifoto}
        fi

        fifofrom=$(expr match "${shellstartercmd}" '.* \(.*RevoSparkFifoFromSpark\S*\).*')
        if [ ! -z "${fifofrom}" ]; then
            trace "rm -f ${fifofrom}*"
            rm -f ${fifofrom}*
        fi
    done
fi
