Sample Header Ad - 728x90

Shell Variable Expansion in qsub command through drmaa

1 vote
1 answer
162 views
I am running a bulk job submission to SGE (Sun Grid Engine) using python drmaa bindings . For the bulk job submission I am submitting a python script that takes in one argument and is command line executable, through a shebang. To properly parameterize the job bulk submission I am setting environment variables to propagate to the python script through the -v option. I am trying to do an indirect variable expansion in my zsh environment based on the $TASK_ID/$SGE_TASK_ID environment variable that SGE exports during job submittal. As a minimal reproducible example of the indirect variable expansion I am trying to do something like this, which works in my shell.
export foo1=2
export num=1

echo $(tmp=foo$num; echo ${(P)tmp})
which produces 2 The example script job_script.py
#! /usr/bin/python
import argparse
import os

parser = argparse.ArgumentParser()
parser.add_argument("input_path", type=os.path.realpath)

def main(input_path):
    # do stuff
    ...

if __name__ == "__main__":
    args = parser.parse_args
    input_path = args.input_path
    main(input_path)
The example drmaa submittal script
import os

# add path to libs
os.environ["DMRAA_LIBRARY_PATH"] = "path to DMRAA shared object"
os.environ["SGE_ROOT"] = "path to SGE root directory"
import drmaa

input_dir_suffixes = [1, 2, 5, 7, 10, 11]

INPUT_BASE_DIR = "/home/mel/input_data"

base_qsub_options = {
    "P": "project",
    "q": "queue",
    "b": "y", # means is an executable
    "shell": "y", # start up shell
}
native_specification = " ".join(f"-{k} {v}" for k,v in base_qsub_options.items())
remote_command = "job_script.py"

num_task_ids = len(input_dir_suffixes)
task_start = 1
task_stop = num_task_ids + 1
task_step = 1
task_id_zip = zip(range(1, num_task_ids + 1), input_dir_suffixes) 
task_id_env_vars = {
   f"TASK_ID_{task_id}_SUFFIX": str(suffix) for task_id, suffix in task_id_zip 
}

io_task_id = r"$(tmp=SUFFIX_TASK_ID_$TASK_ID; echo ${(P)tmp)})"
arg_task_id = r"$(tmp=SUFFIX_TASK_ID_$SGE_TASK_ID; echo ${(P)tmp)})"

with drmaa.Session() as session:
    
    template = session.createJobTemplate()
    template.nativeSpecification = native_specification
    template.remoteCommand = remote_command
    template.jobEnvironment = task_id_env_vars
    template.outputPath = f":{INPUT_BASE_DIR}/output/{io_task_id}.o"
    template.outputPath = f":{INPUT_BASE_DIR}/error/{io_task_id}.e"

    args_list = [f"{INPUT_BASE_DIR}/data{arg_task_id}"]
    template.args = args_list
    session.runBulkJobs(template, task_start, task_stop - 1, task_step)
    session.deleteJobTemplate(template)
Apologize if there is a syntax error, I have to hand copy this, as its on a different system. With the submission done If I do a qstat -j on the job number I get the following settings displayed
sge_o_shell:         /usr/bin/zsh
stderr_path_list:    NONE::/home/mel/input_data/error_log/$(tmp=SUFFIX_TASK_ID_$TASK_ID; echo ${(P)tmp}).e
stdout_path_list:    NONE::/home/mel/input_data/output_log/$(tmp=SUFFIX_TASK_ID_$TASK_ID; echo ${(P)tmp}).o
job_args:            /home/mel/input_data/data$(tmp=SUFFIX_TASK_ID$SGE_TASK_ID; echo ${(P)tmp})
script_file:         job_script.py

env_list: 
SUFFIX_TASK_ID_1=1,SUFFIX_TASK_ID_2=2,SUFFIX_TASK_ID_3=5,SUFFIX_TASK_ID_4=7,SUFFIX_TASK_ID_5=10,SUFFIX_TASK_ID_6=11
error logs and output logs get made respectively but there is only a partial expansion. Examples
$(tmp=SUFFIX_TASK_ID1; echo ${(P)tmp}).e
$(tmp=SUFFIX_TASK_ID1; echo ${(P)tmp}).o
If we cat the error logs we see Illegal variable name Is what I am trying to do possible? So I am presuming something somewhere is not activating my zsh correctly.
Asked by Melendowski (111 rep)
May 31, 2023, 10:09 PM
Last activity: Jun 4, 2023, 11:45 AM