#!/bin/bash

# TODO: deprecate this file when 9.0.1 is out of support

# example: /usr/lib64/MRO-for-MRS-8.0.0/R-3.2.2/lib64/R/library/RevoScaleR/utils/spark/RevoSparkHPCLauncher \
#/var/RevoShare/hue/RevoSparkFifoToSpark \
#/var/RevoShare/hue/RevoSparkFifoFromSpark \
#2 \
#/user/RevoShare/hue/dfs-045955C066CB4FB7B7D5D6E58CF66308/output \
#/var/RevoShare/hue/cluster-045955C066CB4FB7B7D5D6E58CF66308/input.R \
#/var/RevoShare/hue/cluster-045955C066CB4FB7B7D5D6E58CF66308/00.RInData \
#/var/RevoShare/hue/cluster-045955C066CB4FB7B7D5D6E58CF66308/scaleR.sh \
# /usr/bin/Revoscript \
# default 0 \
# dev ASFDKLS-sessionID \
# 600 \
# 2 1g 1g 1000 false 2

SCRIPT_DIR=$(cd "`dirname $(readlink -nf "$0")`"; pwd -P)
. "${SCRIPT_DIR}/RevoTracing"

trace "$*"

# script parameters
fifoToSpark=$1
shift
fifoFromSpark=$1
shift
numTasks=$1
shift
dfsOutputPath=$1
shift
inputDotRPath=$1
shift
dotRInDataPath=$1
shift
scaleRDotShPath=$1
shift
rPath=$1
shift
nameNode=$1
shift
nameNodePort=$1
shift
user=$1
shift
rSessionPid=$1
shift
appId=$1
shift
jobId=$1
shift
wait=$1
shift
idleTimeout=$1
shift
#The order of spark configuration
#--executor-cores
#--executor-memory
#--driver-memory
#--conf spark.yarn.executor.memoryOverhead
#--conf spark.speculation
#--num-executors
#extra configuration
executorCores=$1
shift
executorMem=$1
shift
driverMem=$1
shift
executorOverheadMem=$1
shift
numExecutors=$1
shift
clusterGUIDDir=$1
shift
extraSparkConfig=$1





#TODO: This hack is only temporatory fix for 8.0.3 client before we backport fix in 8.0.3 client
# After the backport, we should remove this logic.
if [[ (${rPath} =~ ^[A-Z]\:/) || (${rPath} =~ ^[A-Z]\:\\) ]]; then
  rPath="/usr/bin/Revo64"
fi

dfsOutputFile=$(basename "$dfsOutputPath")
inputDotRFile=$(basename "$inputDotRPath")
dotRInDataFile=$(basename "$dotRInDataPath")
scaleRDotShFile=$(basename "$scaleRDotShPath")

hdfsWorkDir=$(dirname "$dfsOutputPath")
nativeGuidDir=$(dirname "$scaleRDotShPath")


function hpcReturn() {
    echo $1 > ${nativeGuidDir}/jobStatus.txt
}

echo " Running job: $$" >> ${nativeGuidDir}/MergedOut0.txt
hpcReturn "running"

"${SCRIPT_DIR}/RevoSparkSubmitLauncher" "$fifoToSpark" "$fifoFromSpark" "${rPath}" \
                                       "$nameNode" "$nameNodePort" "$user" \
                                       "${rSessionPid}" "${appId}" "${jobId}" \
                                       "${wait}" "${idleTimeout}" \
                                       "$executorCores" "$executorMem" \
                                       "$driverMem" "$executorOverheadMem" \
                                       "$numExecutors" "$extraSparkConfig" \
                                       "0" #this zero argument is passed to the parameter loggingLevel at the spark driver program side

if [ $? -ne 0 ]; then
    hpcReturn "failed"
    echo "Error while running RevoSparkSubmitLauncher"
    exit 1
fi

# Create command id for jobFifoFromSpark communication
commandId=$(mktemp -u FILENAMEXXXXXX | sed 's/FILENAME//g')
jobFifoFromSpark=${fifoFromSpark}-${commandId}
rm -f "$jobFifoFromSpark"
mkfifo "$jobFifoFromSpark"

SPARK_COMMAND="start-hpc clusterGUIDDir=$clusterGUIDDir numTasks=$numTasks hdfsWorkDir=$hdfsWorkDir dfsOutputFileName=$dfsOutputFile inputDotRFileName=$inputDotRFile dotRInDataFileName=$dotRInDataFile scaleRDotShFileName=$scaleRDotShFile loggingLevel=0 commandId=${commandId}"

trace "command for Spark: ${SPARK_COMMAND}"
# Redirect namepipe as file descriptor 4 to avoid blocking the script for handling the case: spark crashes before fifo is read. 
# Cannot simply use & to unblock because it can cause zombie HPALauncher process
exec 4<> ${fifoToSpark}
echo "${SPARK_COMMAND}" >&4
trace "command sent, waiting for response"

# wait for spark to finish
SPARK_RESPONSE=$(cat $jobFifoFromSpark)
rm -f "$jobFifoFromSpark"
trace "response from Spark: ${SPARK_RESPONSE}"
if [[ "${SPARK_RESPONSE}" =~ "ERROR" ]]; then
    hpcReturn "failed"
    while read -r line; do
        echo "$line"
    done <<<"${SPARK_RESPONSE}"
    exit 1
fi

hpcReturn "finished"
