Source code for IMLCV.configs.bash_app_python

# this is a helper function to perform md simulations. Executed by parsl on HPC infrastructure, but
import argparse
import os
import sys
from datetime import datetime
from pathlib import Path

import cloudpickle
from parsl import bash_app
from parsl import File
from parsl import python_app
from parsl.dataflow.dflow import AppFuture


# @typeguard.typechecked
[docs]def bash_app_python( function=None, executors="all", precommand="", # command to run before the python command ): def decorator(func): def wrapper( *args, execution_folder=None, stdout=None, stderr=None, inputs=[], outputs=[], **kwargs, ): # merge in and outputs inputs = [*inputs, *kwargs.pop("inputs", [])] outputs = [*outputs, *kwargs.pop("outputs", [])] def rename(name): path, name = os.path.split(name.filepath) return os.path.join(path, f"bash_app_{name}") if execution_folder is None: p = Path.cwd() / func.__name__ i = 0 while p.exists(): p = Path.cwd() / (f"{func.__name__}_{i:0>3}") i += 1 execution_folder = p if isinstance(execution_folder, str): execution_folder = Path(execution_folder) execution_folder.mkdir(exist_ok=True) def rename_num(name, i): stem = name.name.split(".") stem[0] = f"{stem[0]}_{i:0>3}" return name.parent / ".".join(stem) def find_num(name): i = 0 while rename_num(name, i).exists(): i += 1 return i, rename_num(name, i) _, lockfile = find_num(execution_folder / f"{func.__name__}.lock") with open(lockfile, "w+"): pass i, file_in = find_num(execution_folder / f"{func.__name__}.inp.cloudpickle") with open(file_in, "w+"): pass file_out = rename_num( execution_folder / f"{func.__name__}.outp.cloudpickle", i, ) i, stdout = find_num( execution_folder / (f"{ func.__name__}.stdout" if stdout is None else Path(stdout)), ) stderr = rename_num( execution_folder / (f"{ func.__name__}.stderr" if stderr is None else Path(stderr)), i, ) if not execution_folder.exists(): execution_folder.mkdir(exist_ok=True, parents=True) def fun(*args, stdout, stderr, inputs, outputs, **kwargs): from pathlib import Path from parsl import File execution_folder = Path(inputs[-1].filepath) outputs = [str(os.path.relpath(i.filepath, execution_folder)) for i in outputs] inputs = [str(os.path.relpath(o.filepath, execution_folder)) for o in inputs[:-1]] file_in = inputs[-1] file_out = outputs[-1] if len(inputs) > 1: kwargs["inputs"] = [File(i) for i in inputs[:-1]] if len(outputs) > 1: kwargs["outputs"] = [File(o) for o in outputs[:-1]] with open(execution_folder / file_in, "rb+") as f: cloudpickle.dump((func, args, kwargs), f) return f"{precommand} python -u { os.path.realpath( __file__ ) } --folder { str(execution_folder) } --file_in { file_in } --file_out { file_out }" fun.__name__ = func.__name__ future: AppFuture = bash_app(function=fun, executors=executors)( inputs=[*inputs, File(str(file_in)), File(str(execution_folder))], outputs=[*[File(rename(o)) for o in outputs], File(str(file_out))], stdout=str(stdout), stderr=str(stderr), *args, **kwargs, ) def load(inputs=[], outputs=[]): with open(inputs[-1].filepath, "rb") as f: result = cloudpickle.load(f) import os import shutil os.remove(inputs[-1].filepath) for i, o in zip(inputs[:-1], outputs): shutil.move(i.filepath, o.filepath) # transfer complete,remove lock file os.remove(str(lockfile)) return result load.__name__ = f"{func.__name__}_load" return python_app(load, executors=["default"])( inputs=future.outputs, outputs=outputs, ) return wrapper if function is not None: return decorator(function) return decorator
if __name__ == "__main__":
[docs] parser = argparse.ArgumentParser( description="Execute python function in a bash app", )
parser.add_argument( "--file_in", type=str, help="path to file f containing cloudpickle.dump((func, args, kwargs), f)", ) parser.add_argument( "--file_out", type=str, help="path to file f containing cloudpickle.dump((func, args, kwargs), f)", ) parser.add_argument("--folder", type=str, help="working directory") args = parser.parse_args() os.chdir(args.folder) rank = 0 use_mpi = False try: from mpi4py import MPI MPI.pickle.__init__(cloudpickle.dumps, cloudpickle.loads) comm = MPI.COMM_WORLD rank = comm.Get_rank() num_ranks = comm.Get_size() if num_ranks > 1: use_mpi = True except ImportError: pass if rank == 0: print("#" * 20) print(f"got input {sys.argv}") print(f"task started at {datetime.now():%d/%m/%Y %H:%M:%S }") print(f"working in folder {os.getcwd()}") if use_mpi: print(f"using mpi with {num_ranks} ranks") with open(args.file_in, "rb") as f: func, fargs, fkwargs = cloudpickle.load(f) print("#" * 20) else: func = None fargs = None fkwargs = None if use_mpi: func = comm.bcast(func, root=0) fargs = comm.bcast(fargs, root=0) fkwargs = comm.bcast(fkwargs, root=0) a = func(*fargs, **fkwargs) if use_mpi: a = comm.gather(a, root=0) if rank == 0: with open(args.file_out, "wb+") as f: cloudpickle.dump(a, f) os.remove(args.file_in) print("#" * 20) print(f"task finished at {datetime.now():%d/%m/%Y %H:%M:%S}") print("#" * 20)