#!/bin/bash

SCRIPT=$(readlink -f "$0")
BASEDIR=$(dirname "$SCRIPT")

function print_help
{
cat << EOF
  SYNOPSIS:
      mrs-hadoop-job --help
      mrs-hadoop-job COMMOND [OPTION]

  DESCRIPTION:
      The job level entry point script to parse input parameters and dispatch to other scripts.

  COMMOND:
      path
              Get mrs-hadoop-job script location path
      connect [--version version] [--dir job_work_dir] [--output job_output] [--stream stream_type] [--trace-level trace_level] [--dry-run] [--server-type server_type] [--interpreter-path interpreter_path]
              Start and connect new Spark application.
      disconnect [--version version] [--dir job_work_dir] [--output job_output] [--stream stream_type] [--trace-level trace_level] [--dry-run] [--server-type server_type] [--interpreter-path interpreter_path] [--scope scope]
              Disconnect and kill Spark application.
      applist [--version version] [--dir job_work_dir] [--output job_output] [--stream stream_type] [--trace-level trace_level] [--dry-run] [--server-type server_type] [--interpreter-path interpreter_path] [--scope scope]
              List running Spark applications.
      run [--version version] [--dir job_work_dir] [--output job_output] [--stream stream_type] [--trace-level trace_level] [--dry-run] [--server-type server_type] [--interpreter-path interpreter_path]
              Start the job. Start Spark application if necessary. Return job id in stdout.
      query [--version version] [--dir job_work_dir] [--output job_output] [--stream stream_type] [--trace-level trace_level] [--interpreter-path interpreter_path] [--dry-run] [--status]
              Get job id from working dir and return job status in stdout.
      monitor [--version version] [--dir job_work_dir] [--output job_output] [--stream stream_type] [--trace-level trace_level] [--interpreter-path interpreter_path] [--dry-run] [--msg-level msg_level] [--node node] [--max-line max_line] [--start-line start_line]
              Monitor a job, create stdout/stderr/both chunk.
      kill [--version version] [--dir job_work_dir] [--output job_output] [--stream stream_type] [--trace-level trace_level] [--interpreter-path interpreter_path] [--dry-run]
              Get job id from working dir and kill the job.
      clean [--version version] [--dir job_work_dir] [--output job_output] [--stream stream_type] [--trace-level trace_level] [--interpreter-path interpreter_path] [--dry-run]
              Clean resources.

  OPTION:
      --version                 [Required] Protocol version provided by client.
      --dir                     [Required] Job stage directory. Client job files will be copied into this folder.
      --trace-level             [Required] From value 0-7 for indicating trace level. Save trace/log info to protocol.log file. (Not implemented yet)
      --server-type             [Optional] The type of backend server (R/Python) to use. Default value is "R".
      --output                  [Optional] Output file prefix OUTPUT. Redirect stdout/stderr to OUTPUT.stdout and OUTPUT.stderr. If not specify, then use protocol.stdout and protocol.stderr.
      --stream                  [Optional] Console output stream types: SINGLE-STREAM or TWO-STREAM. By default it is SINGLE-STREAM. [Not implemented yet]
      --dry-run                 [Optional] Only print important operations but not execute them. (Not implemented yet)
      --interpreter-path        [Optional] Select which interpreter to use.
      --status                  Required option for command 'query'. Indicate query job status.
      --msg-level               Required option for command 'monitor'. Indicate the type(ERROR/INFO) of monitor message to generate on server.
      --node                    Optional option for command 'monitor'. Indicate the node (master/worker) to monitor. (Not implemented yet)
      --max-line                Required option for command 'monitor'. Indicate the line of log to monitor. (Number or 'all')
      --start-line              Optional option for command 'monitor'. Indicate the start line of log to monitor. (Not implemented yet)
      --scope                   Optional option for command 'disconnect' and 'applist'. Indicate the scope of closing spark apps. (user or session)

  RETURN CODE:
      0 for success.
      < 0 for error.
      > 0 for warning.
EOF
}

function print_std()
{
  cat ${STDOUT}
  cat ${STDERR}
  echo "%PROTOCOL-END%"
}

function print_and_exit()
{
  local exit_code=$1
  print_std
  exit ${exit_code}
}

function print_and_exit_if_fail()
{
  local last_command_exit_code=$1
  if [ ${last_command_exit_code} != 0 ]; then
    print_and_exit -1
  fi
}

if [ "$1" == "--help" ]; then
  print_help
  exit 0
fi

command=$1
shift

if [ -z ${command} ] || [[ " path connect disconnect applist run query monitor kill clean " != *" ${command} "* ]];
then
  echo "Invalid command: ${command}"
  print_help
  exit -1
fi 

if [ ${command} == "path" ]; then
  echo "%PROTOCOL-STDOUT%"
  echo "${SCRIPT}"
  echo "%PROTOCOL-STDERR%"
  echo "%PROTOCOL-END%"
  exit 0
fi

while [ "$1" != "" ]; do
    case $1 in
        --dir )                 shift
                                dir=$1
                                ;;
        --version )             shift
                                version=$1
                                ;;
        --server-type )         shift
                                serverType=$1
                                ;;
        --msg-level )           shift
                                msgLevel=$1
                                ;;
        --node )                shift
                                node=$1
                                ;;
        --max-line )            shift
                                maxLine=$1
                                ;;
        --start-line )          shift
                                startLine=$1
                                ;;
        --status )              status="true"
                                ;;
        --output )              shift
                                output=$1
                                ;;
        --stream )              shift
                                stream=$1
                                ;;
        --trace-level )         shift
                                traceLevel=$1
                                ;;
        --scope )               shift
                                scope=$1
                                ;;
        --interpreter-path )    shift
                                interpreterPath=$1
                                ;;
        --dry-run )             dryRun="true"
                                ;;
        * )                     ;; # Ignore unknown option for backward compatiablity
    esac
    shift
done

if [ -z ${dir} ]; then
  echo "%PROTOCOL-STDOUT%"
  echo "%PROTOCOL-STDERR%"
  echo "Work directory is not specified. Please add --dir."
  echo "%PROTOCOL-END%"
  exit -1
elif [ ! -d ${dir} ]; then
  if [ ${command} == "clean" ]; then
    # clean command might be called multi times
    # if dir does not exist any more, then simply exit
    echo "%PROTOCOL-STDOUT%"
    echo "%PROTOCOL-STDERR%"
    echo "%PROTOCOL-END%"
    exit 0
  fi
  echo "%PROTOCOL-STDOUT%"
  echo "%PROTOCOL-STDERR%"
  echo "Job work directory '${dir}' does not exist on server. Probably it has been cleaned after job completion or cancellation."
  echo "%PROTOCOL-END%"
  exit -1
fi

if [ -z ${output} ]; then
   # Only client expected data will go into STDOUT
   STDOUT=${dir}/protocol.stdout
   # All error and warning message will go into STDERR
   STDERR=${dir}/protocol.stderr
else 
   # Only client expected data will go into STDOUT
   STDOUT=${dir}/${output}.stdout
   # All error and warning message will go into STDERR
   STDERR=${dir}/${output}.stderr
fi

echo "%PROTOCOL-STDOUT%" > ${STDOUT}
echo "%PROTOCOL-STDERR%" > ${STDERR}

if [ -z ${version} ] || [[ ! "${version}" =~ ^[[:digit:]]+\.[[:digit:]]+$ ]];
then
  echo "Invalid version. It should be [0-9]+.[0-9]+" >> ${STDERR}
  print_help >> ${STDERR}
  print_and_exit -1
fi 

if [ -z ${serverType} ]; then
   serverType="R"
else
   if [ "${serverType}" != "R" ] && [ "${serverType}" != "Python" ]; then
      echo "serverType should be R or Python" >> ${STDERR}
      print_and_exit -1
   fi
fi

function check_and_init()
{
  local commandToExecute=$1
  if [ ${serverType} == "R" ]; then
    if [ -z ${interpreterPath} ]; then
      interpreterPath="Revo64" # default value for keeping backward compartiablity
    fi
    ${interpreterPath} --vanilla --slave -q -f ${BASEDIR}/../R/job-check.R --args ${dir} ${BASEDIR}/.. ${commandToExecute} >> ${STDERR} 2>&1
    print_and_exit_if_fail $?

    ${interpreterPath} --vanilla --slave -q -f ${BASEDIR}/../R/job-init.R --args ${dir} ${BASEDIR}/.. ${commandToExecute} >> ${STDERR} 2>&1
    print_and_exit_if_fail $?
  else
    if [ -z ${interpreterPath} ]; then
      interpreterPath="/usr/bin/mlserver-python" # default value for keeping backward compartiablity
    fi
    ${interpreterPath} ${BASEDIR}/../Python/JobCheck.py ${dir} ${BASEDIR}/.. ${commandToExecute} >> ${STDERR} 2>&1
    print_and_exit_if_fail $?

    ${interpreterPath} ${BASEDIR}/../Python/JobInit.py ${dir} ${BASEDIR}/.. ${commandToExecute} >> ${STDERR} 2>&1
    print_and_exit_if_fail $?
  fi

  chmod -R u+rw ${dir}
  chmod u+x ${dir}/*.sh
}

if [ ${command} == "connect" ]; then
  check_and_init ${command}
  ${dir}/spark-connect.sh 1>>${STDOUT} 2>>${STDERR}
  print_and_exit_if_fail $?
  print_and_exit 0
fi

if [ ${command} == "disconnect" ]; then
  if [ -z ${scope} ]; then
    echo "scope value is required. (user/session)" >> ${STDERR}
    print_help >> ${STDERR}
    print_and_exit -1
  fi

  check_and_init ${command}
  ${dir}/spark-disconnect.sh --scope ${scope} 1>>${STDOUT} 2>>${STDERR}
  print_and_exit_if_fail $?
  print_and_exit 0
fi

if [ ${command} == "applist" ]; then
  if [ -z ${scope} ]; then
    echo "scope value is required. (user/session)" >> ${STDERR}
    print_help >> ${STDERR}
    print_and_exit -1
  fi

  check_and_init ${command}
  ${dir}/spark-list.sh --scope ${scope} 1>>${STDOUT} 2>>${STDERR}
  print_and_exit_if_fail $?
  print_and_exit 0
fi

if [ ${command} == "run" ]; then
  check_and_init ${command}
  ${dir}/start-job.sh 1>>${STDOUT} 2>>${STDERR}
  print_and_exit_if_fail $?
  print_and_exit 0
fi

if [ ${command} == "query" ]; then
  if [ ! -z ${status} ]; then
    if [ -f ${dir}/jobStatus.txt ]; then
      cat ${dir}/jobStatus.txt >> ${STDOUT}
      print_and_exit 0
    else
      echo "Fail to locate job status file on server to query job status." >> ${STDERR}
      print_and_exit -1
    fi
  fi
fi 

if [ ${command} == "monitor" ]; then
  if [ -z ${msgLevel} ] || [[ " ERROR INFO " != *" ${msgLevel} "* ]];
  then
    echo "Invalid msg-level value. It should be ERROR/INFO." >> ${STDERR}
    print_help >> ${STDERR}
    print_and_exit -1
  fi 

  if [ -z ${maxLine} ]; then
    echo "maxLine value is required." >> ${STDERR}
    print_help >> ${STDERR}
    print_and_exit -1
  fi

  if [ ${maxLine} != "all" ]; then
    # if maxLine is not number, then -eq does not applicable.
    # So we can rely on it to check whether maxLine is number.
    # Just need to swallow the error message.
    if [ "${maxLine}" -eq "${maxLine}" ] 2>/dev/null; then
      echo "" > /dev/null
    else
      echo "Invalid maxLine value. It should be number like 128 or 'all'." >> ${STDERR}
      print_help >> ${STDERR}
      print_and_exit -1
    fi
  fi

  bash ${dir}/create-job-output.sh --msg-level ${msgLevel} --max-line ${maxLine} >${STDERR} 2>&1
  print_and_exit_if_fail $?
  print_and_exit 0
fi

if [ ${command} == "kill" ]; then
  ${dir}/kill-job.sh >${STDERR} 2>&1
  print_and_exit_if_fail $?
  print_and_exit 0
fi

if [ ${command} == "clean" ]; then
  print_and_exit 0
fi
