#!/bin/bash

# example: /usr/lib64/MRO-for-MRS-8.0.0/R-3.2.2/lib64/R/library/RevoScaleR/utils/spark/RevoSparkHPCLauncher \
#/var/RevoShare/hue/RevoSparkFifoToSpark \
#/var/RevoShare/hue/RevoSparkFifoFromSpark \
#2 \
#/user/RevoShare/hue/dfs-045955C066CB4FB7B7D5D6E58CF66308/output \
#/var/RevoShare/hue/cluster-045955C066CB4FB7B7D5D6E58CF66308/input.R \
#/var/RevoShare/hue/cluster-045955C066CB4FB7B7D5D6E58CF66308/00.RInData \
#/var/RevoShare/hue/cluster-045955C066CB4FB7B7D5D6E58CF66308/scaleR.sh \
# /usr/bin/Revoscript \
# default 0 \
# dev ASFDKLS-sessionID \
# 600 \
# 2 1g 1g 1000 false 2

SCRIPT_DIR=$(cd "`dirname $(readlink -nf "$0")`"; pwd -P)
. "${SCRIPT_DIR}/RevoTracing"

trace "$*"

# Spark init parameters
fifoToSpark=$1
shift
fifoFromSparkPrefix=$1
shift
jsonPath=$1

nativeGuidDir=$(dirname "$jsonPath")


function jobReturn() {
    echo $1 > ${nativeGuidDir}/jobStatus.txt
}

jobReturn "running"

# Create command id for jobFifoFromSpark communication
commandId=$(mktemp -u FILENAMEXXXXXX | sed 's/FILENAME//g')
jobFifoFromSpark=${fifoFromSparkPrefix}-${commandId}
rm -f "$jobFifoFromSpark"
mkfifo "$jobFifoFromSpark"

SPARK_COMMAND="job jsonPath=${jsonPath} commandId=${commandId}"

trace "command for Spark: ${SPARK_COMMAND}"
# Redirect namepipe as file descriptor 4 to avoid blocking the script for handling the case: spark crashes before fifo is read. 
# Cannot simply use & to unblock because it can cause zombie HPALauncher process
exec 4<> ${fifoToSpark}
echo "${SPARK_COMMAND}" >&4
trace "command sent, waiting for response"

# wait for spark to finish
SPARK_RESPONSE=$(cat $jobFifoFromSpark)
rm -f "$jobFifoFromSpark"
trace "response from Spark: ${SPARK_RESPONSE}"
if [[ "${SPARK_RESPONSE}" =~ "ERROR" ]]; then
    jobReturn "failed"
    while read -r line; do
        echo "$line" >&1
        echo "$line" >&2
    done <<<"${SPARK_RESPONSE}"
    exit 1
fi

jobReturn "finished"
