# this is a helper function to perform md simulations. Executed by parsl on HPC infrastructure, but
import argparse
import os
import sys
from datetime import datetime
from pathlib import Path
import cloudpickle
from parsl import bash_app
from parsl import File
from parsl import python_app
from parsl.dataflow.dflow import AppFuture
# @typeguard.typechecked
[docs]def bash_app_python(
function=None,
executors="all",
precommand="", # command to run before the python command
):
def decorator(func):
def wrapper(
*args,
execution_folder=None,
stdout=None,
stderr=None,
inputs=[],
outputs=[],
**kwargs,
):
# merge in and outputs
inputs = [*inputs, *kwargs.pop("inputs", [])]
outputs = [*outputs, *kwargs.pop("outputs", [])]
def rename(name):
path, name = os.path.split(name.filepath)
return os.path.join(path, f"bash_app_{name}")
if execution_folder is None:
p = Path.cwd() / func.__name__
i = 0
while p.exists():
p = Path.cwd() / (f"{func.__name__}_{i:0>3}")
i += 1
execution_folder = p
if isinstance(execution_folder, str):
execution_folder = Path(execution_folder)
execution_folder.mkdir(exist_ok=True)
def rename_num(name, i):
stem = name.name.split(".")
stem[0] = f"{stem[0]}_{i:0>3}"
return name.parent / ".".join(stem)
def find_num(name):
i = 0
while rename_num(name, i).exists():
i += 1
return i, rename_num(name, i)
_, lockfile = find_num(execution_folder / f"{func.__name__}.lock")
with open(lockfile, "w+"):
pass
i, file_in = find_num(execution_folder / f"{func.__name__}.inp.cloudpickle")
with open(file_in, "w+"):
pass
file_out = rename_num(
execution_folder / f"{func.__name__}.outp.cloudpickle",
i,
)
i, stdout = find_num(
execution_folder / (f"{ func.__name__}.stdout" if stdout is None else Path(stdout)),
)
stderr = rename_num(
execution_folder / (f"{ func.__name__}.stderr" if stderr is None else Path(stderr)),
i,
)
if not execution_folder.exists():
execution_folder.mkdir(exist_ok=True, parents=True)
def fun(*args, stdout, stderr, inputs, outputs, **kwargs):
from pathlib import Path
from parsl import File
execution_folder = Path(inputs[-1].filepath)
outputs = [str(os.path.relpath(i.filepath, execution_folder)) for i in outputs]
inputs = [str(os.path.relpath(o.filepath, execution_folder)) for o in inputs[:-1]]
file_in = inputs[-1]
file_out = outputs[-1]
if len(inputs) > 1:
kwargs["inputs"] = [File(i) for i in inputs[:-1]]
if len(outputs) > 1:
kwargs["outputs"] = [File(o) for o in outputs[:-1]]
with open(execution_folder / file_in, "rb+") as f:
cloudpickle.dump((func, args, kwargs), f)
return f"{precommand} python -u { os.path.realpath( __file__ ) } --folder { str(execution_folder) } --file_in { file_in } --file_out { file_out }"
fun.__name__ = func.__name__
future: AppFuture = bash_app(function=fun, executors=executors)(
inputs=[*inputs, File(str(file_in)), File(str(execution_folder))],
outputs=[*[File(rename(o)) for o in outputs], File(str(file_out))],
stdout=str(stdout),
stderr=str(stderr),
*args,
**kwargs,
)
def load(inputs=[], outputs=[]):
with open(inputs[-1].filepath, "rb") as f:
result = cloudpickle.load(f)
import os
import shutil
os.remove(inputs[-1].filepath)
for i, o in zip(inputs[:-1], outputs):
shutil.move(i.filepath, o.filepath)
# transfer complete,remove lock file
os.remove(str(lockfile))
return result
load.__name__ = f"{func.__name__}_load"
return python_app(load, executors=["default"])(
inputs=future.outputs,
outputs=outputs,
)
return wrapper
if function is not None:
return decorator(function)
return decorator
if __name__ == "__main__":
[docs] parser = argparse.ArgumentParser(
description="Execute python function in a bash app",
)
parser.add_argument(
"--file_in",
type=str,
help="path to file f containing cloudpickle.dump((func, args, kwargs), f)",
)
parser.add_argument(
"--file_out",
type=str,
help="path to file f containing cloudpickle.dump((func, args, kwargs), f)",
)
parser.add_argument("--folder", type=str, help="working directory")
args = parser.parse_args()
os.chdir(args.folder)
rank = 0
use_mpi = False
try:
from mpi4py import MPI
MPI.pickle.__init__(cloudpickle.dumps, cloudpickle.loads)
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
num_ranks = comm.Get_size()
if num_ranks > 1:
use_mpi = True
except ImportError:
pass
if rank == 0:
print("#" * 20)
print(f"got input {sys.argv}")
print(f"task started at {datetime.now():%d/%m/%Y %H:%M:%S }")
print(f"working in folder {os.getcwd()}")
if use_mpi:
print(f"using mpi with {num_ranks} ranks")
with open(args.file_in, "rb") as f:
func, fargs, fkwargs = cloudpickle.load(f)
print("#" * 20)
else:
func = None
fargs = None
fkwargs = None
if use_mpi:
func = comm.bcast(func, root=0)
fargs = comm.bcast(fargs, root=0)
fkwargs = comm.bcast(fkwargs, root=0)
a = func(*fargs, **fkwargs)
if use_mpi:
a = comm.gather(a, root=0)
if rank == 0:
with open(args.file_out, "wb+") as f:
cloudpickle.dump(a, f)
os.remove(args.file_in)
print("#" * 20)
print(f"task finished at {datetime.now():%d/%m/%Y %H:%M:%S}")
print("#" * 20)