#!/bin/bash

# example: /usr/lib64/Revo-7.3/R-3.1.3/lib64/R/library/RevoScaleR/utils/RevoSparkHPALauncher \
#/var/RevoShare/hue/RevoSparkFifoToSpark \
#/var/RevoShare/hue/RevoSparkFifoFromSpark \
#TRUE \
#spark-no-header-xUwb1 \
#spark-create-input-data-rdd-xUwb1 \
#/user/RevoShare/hue/4D4DB59678D24C9F8099540A36CFAB3E/.input \
#/user/RevoShare/hue/4D4DB59678D24C9F8099540A36CFAB3E/IRO.iro \
#hdfs:///tmp/tab2b4.csv \
#default 0 /usr/bin/Revoscript

SCRIPT_DIR=$(cd "`dirname $(readlink -nf "$0")`"; pwd -P)
. "${SCRIPT_DIR}/RevoTracing"

trace "$*"

# TODO when hard-coded positions are replaced by named arguments (getopt)
#      an updated will be needed in UpdateInputFileSpecInSparkSystemCommand()
#      in ExaCore/InteractiveAnalysis.cpp, which currently assumes parameters'
#      positions to be harcoded

# script parameters
fifoToSpark=$1               # 1
shift
fifoFromSpark=$1             # 2
shift
isInputXdf=$1                # 3
shift
dotInputPath=$1              # 4
shift
inDataPath=$1                # 5, expected in UpdateInputFileSpecInSparkSystemCommand
shift
finalIROPath=$1              # 6
shift
rPath=$1                     # 7
shift
nameNode=$1                  # 8, expected in UpdateInputFileSpecInSparkSystemCommand
shift
nameNodePort=$1              # 9, expected in UpdateInputFileSpecInSparkSystemCommand
shift
algorithm=$1                 # 10
shift
textHeader=$1                # 11
shift
# this flag says whether we load data from:
#  1. XDF in HDFS - "spark-create-xdf-rdd"
#  2. one-time TXT data in HDFS - "spark-create-text-rdd"
#  3. multi-time TXT data in HDFS - "spark-import-text-to-xdf-rdd:{Path}" and "spark-use-xdf-rdd"
#
# See rxCluster.R:rxExecJob() for more details.
sparkDataScenario=$1         # 12
shift
hdfsWorkDir=$1               # 13
shift
user=$1                      # 14
shift
rSessionPid=$1               # 15
shift
appId=$1                     # 16
shift
jobId=$1                     # 17
shift
wait=$1                      # 18
shift
idleTimeout=$1               # 19
shift

#The order of spark configuration
#--executor-cores
#--executor-memory
#--driver-memory
#--conf spark.yarn.executor.memoryOverhead
#--conf spark.speculation
#--num-executors
#extra configuration
executorCores=$1             # 20
shift
executorMem=$1               # 21
shift
driverMem=$1                 # 22
shift
executorOverheadMem=$1       # 23
shift
numExecutors=$1              # 24
shift
clusterGUIDDir=$1            # 25
shift
sparkReduceMethod=$1         # 26
shift
inXdfUuid=$1                 # 27 
shift
useRxSparkDataOutput=$1      # 28
shift
outXdfUuid=$1                # 29
shift
outDataPath=$1               # 30
shift
# we keep the parameter loggingLevel ahead of extraSparkConfig because 
# the core engine parses the system command by white space
# TO DO: improve the parser of system command in core engine to remove the dependency on the order of the parameters 
loggingLevel=$1              # 31
shift
skipLaunchSpark=$1           # 32
shift
tmpFSWorkDir=$1              # 33
shift
master=$1                    # 34
shift
masterInterpreterPath=$1     # 35
shift
extraSparkConfig=$1          # 36

trace "hdfsWorkDir = $hdfsWorkDir"

# TODO: deprecate this the spark launch code when 9.1.0 is out of support
# It is only be used for old HPA workflow
if [ "${skipLaunchSpark}" == "TRUE" ] || [ "${skipLaunchSpark}" == "true" ]; then
    trace "Skip spark launch due to using other backends. (sparklyr or sparkR[not integrated yet]). Or it is running new HPA workflow. New workflow no need to launch spark here."
else
    # check if shell is running, and start it if it is not
    "${SCRIPT_DIR}/RevoSparkSubmitLauncher" "$fifoToSpark" "$fifoFromSpark" "${rPath}" \
                                           "${nameNode}" "${nameNodePort}" "${user}" \
                                           "${rSessionPid}" "${appId}" "${jobId}" \
                                           "${wait}" "${idleTimeout}" \
                                           "${executorCores}" "${executorMem}" \
                                           "${driverMem}" "${executorOverheadMem}" \
                                           "${numExecutors}" "${extraSparkConfig}" \
                                           "${loggingLevel}" "${tmpFSWorkDir}" "${master}" "${masterInterpreterPath}"
    if [ $? -ne 0 ]; then
        echo "Error while running RevoSparkSubmitauncher"
        exit 1
    fi
fi

# clean last run's IRO file to avoid mistakenly using it
trace "removing last run's IRO $finalIROPath"
rm -rf "${finalIROPath}"

# Create command id for jobFifoFromSpark communication
commandId=$(mktemp -u FILENAMEXXXXXX | sed 's/FILENAME//g')
jobFifoFromSpark=${fifoFromSpark}-${commandId}
rm -f "$jobFifoFromSpark"

if [ "$isInputXdf" != "TRUE" ] && [ "$isInputXdf" != "true" ]; then
    # text data - load it to RDD
    # TODO Should we seperate spark data frame to use another command start-dataframe? Rather than sharing with start-text.
    SPARK_COMMAND="start-text clusterGUIDDir=$clusterGUIDDir dotInputPath=$dotInputPath textDataPath=$inDataPath finalIROPath=$finalIROPath sparkDataScenario=$sparkDataScenario textHeader=$textHeader sparkReduceMethod=$sparkReduceMethod inXdfUuid=${inXdfUuid} useRxSparkDataOutput=${useRxSparkDataOutput} outXdfUuid=${outXdfUuid} outDataPath=${outDataPath} loggingLevel=$loggingLevel commandId=$commandId"
else
    # XDF data
    # TODO parameters cannot contain spaces - handle that (JSON?)
    # send this command to Spark via fifoToSpark
    SPARK_COMMAND="start clusterGUIDDir=$clusterGUIDDir dotInputPath=$dotInputPath xdfDataPath=$inDataPath finalIROPath=$finalIROPath algorithm=${algorithm} hdfsWorkDir=${hdfsWorkDir} sparkDataScenario=$sparkDataScenario sparkReduceMethod=$sparkReduceMethod inXdfUuid=${inXdfUuid} useRxSparkDataOutput=${useRxSparkDataOutput} outXdfUuid=${outXdfUuid} outDataPath=${outDataPath} loggingLevel=$loggingLevel commandId=$commandId"
fi

trace "command for Spark: ${SPARK_COMMAND}"
echo "${SPARK_COMMAND}" > ${fifoToSpark}
trace "command sent, waiting for response"

# wait for spark to finish
trace "wait fifo: $jobFifoFromSpark"
until [ -f "$jobFifoFromSpark" ]; do
    sleep 0.05
done

trace "read fifo: $jobFifoFromSpark"
SPARK_RESPONSE=$(cat $jobFifoFromSpark)
rm -f "$jobFifoFromSpark"

trace "response from Spark: ${SPARK_RESPONSE}"
if [[ "${SPARK_RESPONSE}" =~ "ERROR" ]]; then
    while read -r line; do
        echo "$line"
    done <<<"${SPARK_RESPONSE}"
    exit 1
fi

# clean old .input file to avoid next iteration mistakenly using this
trace "removing $dotInputPath"
rm -rf "${dotInputPath}" 
