import logging
import math
import os
import time
from pathlib import Path
import parsl.providers.slurm.slurm
from parsl import HighThroughputExecutor
from parsl import WorkQueueExecutor
from parsl.channels import LocalChannel
from parsl.executors.base import ParslExecutor
from parsl.providers.base import JobState
from parsl.providers.base import JobStatus
from parsl.providers.slurm.template import template_string
from parsl.utils import wtime_to_minutes
[docs]ROOT_DIR = Path(os.path.dirname(__file__)).parent
[docs]logger = logging.getLogger(__name__)
[docs]translate_table = {
"PD": JobState.PENDING,
"R": JobState.RUNNING,
"CA": JobState.CANCELLED,
"CF": JobState.PENDING, # (configuring),
"CG": JobState.RUNNING, # (completing),
"CD": JobState.COMPLETED,
"F": JobState.FAILED, # (failed),
"TO": JobState.TIMEOUT, # (timeout),
"NF": JobState.FAILED, # (node failure),
"RV": JobState.FAILED, # (revoked) and
"SE": JobState.FAILED, # (special exit state)
}
# taken from psiflow
[docs]class SlurmProviderVSC(parsl.providers.slurm.slurm.SlurmProvider):
"""Specifies cluster and partition for sbatch, scancel, and squeue"""
def __init__(self, cluster=None, **kwargs):
super().__init__(**kwargs)
self.cluster = cluster
self.scheduler_options += "#SBATCH --export=NONE\n"
# both cluster and partition need to be specified
assert self.cluster is not None
assert self.partition is not None
[docs] def submit(self, command, tasks_per_node, job_name="parsl.slurm"):
"""Submit the command as a slurm job.
This function differs in its parent in the self.execute_wait()
call, in which the slurm partition is explicitly passed as a command
line argument as this is necessary for some SLURM-configered systems
(notably, Belgium's HPC infrastructure).
In addition, the way in which the job_id is extracted from the returned
log after submission is slightly modified, again to account for
the specific cluster configuration of HPCs in Belgium.
Parameters
----------
command : str
Command to be made on the remote side.
tasks_per_node : int
Command invocations to be launched per node
job_name : str
Name for the job
Returns
-------
None or str
If at capacity, returns None; otherwise, a string identifier for the job
"""
scheduler_options = self.scheduler_options
worker_init = self.worker_init
if self.mem_per_node is not None:
scheduler_options += f"#SBATCH --mem={self.mem_per_node}g\n"
worker_init += f"export PARSL_MEMORY_GB={self.mem_per_node}\n"
if self.cores_per_node is not None:
cpus_per_task = math.floor(self.cores_per_node / tasks_per_node)
scheduler_options += f"#SBATCH --cpus-per-task={cpus_per_task}"
worker_init += f"export PARSL_CORES={cpus_per_task}\n"
job_name = f"{job_name}.{time.time()}"
script_path = f"{self.script_dir}/{job_name}.submit"
script_path = os.path.abspath(script_path)
job_config = {}
job_config["submit_script_dir"] = self.channel.script_dir
job_config["nodes"] = self.nodes_per_block
job_config["tasks_per_node"] = tasks_per_node
job_config["walltime"] = wtime_to_minutes(self.walltime)
job_config["scheduler_options"] = scheduler_options
job_config["worker_init"] = worker_init
job_config["user_script"] = command
# Wrap the command
job_config["user_script"] = self.launcher(
command,
tasks_per_node,
self.nodes_per_block,
)
self._write_submit_script(template_string, script_path, job_name, job_config)
if self.move_files:
channel_script_path = self.channel.push_file(
script_path,
self.channel.script_dir,
)
else:
channel_script_path = script_path
submit_cmd = "sbatch --clusters={2} --partition={1} {0}".format(
channel_script_path,
self.partition,
self.cluster,
)
retcode, stdout, stderr = self.execute_wait(submit_cmd)
job_id = None
if retcode == 0:
for line in stdout.split("\n"):
if line.startswith("Submitted batch job"):
# job_id = line.split("Submitted batch job")[1].strip()
job_id = line.split("Submitted batch job")[1].strip().split()[0]
self.resources[job_id] = {
"job_id": job_id,
"status": JobStatus(JobState.PENDING),
}
else:
logger.error("Submit command failed")
logger.error(
"Retcode:%s STDOUT:%s STDERR:%s",
retcode,
stdout.strip(),
stderr.strip(),
)
return job_id
[docs] def _status(self):
"""Returns the status list for a list of job_ids
Args:
self
Returns:
[status...] : Status list of all jobs
"""
job_id_list = ",".join(
[jid for jid, job in self.resources.items() if not job["status"].terminal],
)
if not job_id_list:
logger.debug("No active jobs, skipping status update")
return
cmd = "squeue --clusters={1} --noheader --format='%i %t' --job '{0}'".format(
job_id_list,
self.cluster,
)
logger.debug("Executing %s", cmd)
retcode, stdout, stderr = self.execute_wait(cmd)
logger.debug("squeue returned %s %s", stdout, stderr)
# Execute_wait failed. Do no update
if retcode != 0:
logger.warning(f"squeue failed with non-zero exit code {retcode}:")
logger.warning(stdout)
logger.warning(stderr)
return
jobs_missing = set(self.resources.keys())
for line in stdout.split("\n"):
if not line:
# Blank line
continue
job_id, slurm_state = line.split()
if slurm_state not in translate_table:
logger.warning(f"Slurm status {slurm_state} is not recognized")
status = translate_table.get(slurm_state, JobState.UNKNOWN)
logger.debug(
"Updating job {} with slurm status {} to parsl state {!s}".format(
job_id,
slurm_state,
status,
),
)
self.resources[job_id]["status"] = JobStatus(status)
jobs_missing.remove(job_id)
# squeue does not report on jobs that are not running. So we are filling in the
# blanks for missing jobs, we might lose some information about why the jobs failed.
for missing_job in jobs_missing:
logger.debug(f"Updating missing job {missing_job} to completed status")
self.resources[missing_job]["status"] = JobStatus(JobState.COMPLETED)
[docs] def cancel(self, job_ids):
"""Cancels the jobs specified by a list of job ids
Args:
job_ids : [<job_id> ...]
Returns :
[True/False...] : If the cancel operation fails the entire list will be False.
"""
job_id_list = " ".join(job_ids)
retcode, stdout, stderr = self.execute_wait(
f"scancel --clusters={self.cluster} {job_id_list}",
)
rets = None
if retcode == 0:
for jid in job_ids:
self.resources[jid]["status"] = JobStatus(
JobState.CANCELLED,
) # Setting state to cancelled
rets = [True for i in job_ids]
else:
rets = [False for i in job_ids]
return rets
[docs]def get_slurm_provider(
env,
label,
path_internal: Path | str,
cpu_cluster,
gpu_cluster=None,
account=None,
channel=LocalChannel(),
gpu=False,
cores=None,
open_mp_threads_per_core: int | None = None,
parsl_cores=False,
mem=None,
memory_per_core=None,
walltime="48:00:00",
init_blocks=1,
min_blocks=1,
max_blocks=1,
parallelism=1,
use_work_queue: bool = True,
wq_timeout: int = 120, # in seconds
gpu_part="gpu_rome_a100",
cpu_part="cpu_rome",
py_env=None,
):
if py_env is None:
py_env = f"source {ROOT_DIR}/Miniconda3/bin/activate; which python"
if gpu_cluster is None:
gpu_cluster = cpu_cluster
assert open_mp_threads_per_core is None, "open_mp_threads_per_core is not tested yet"
worker_init = f"{py_env}; \n"
if env == "hortense":
worker_init += "module load CP2K/8.2-foss-2021a \n"
elif env == "stevin":
worker_init += "module load CP2K/7.1-foss-2020a \n"
worker_init += "module unload SciPy-bundle Python \n"
if not parsl_cores:
if open_mp_threads_per_core is None:
open_mp_threads_per_core = 1
total_cores = cores * open_mp_threads_per_core
worker_init += "unset SLURM_CPUS_PER_TASK\n"
worker_init += f"export SLURM_CPUS_PER_TASK={open_mp_threads_per_core}\n"
worker_init += f"export SLURM_NTASKS_PER_NODE={cores}\n"
worker_init += f"export SLURM_TASKS_PER_NODE={cores}\n"
worker_init += f"export SLURM_NTASKS={cores}\n"
worker_init += f"export OMP_NUM_THREADS={open_mp_threads_per_core}\n"
else:
assert open_mp_threads_per_core is None, "parsl doens't use openmp cores"
total_cores = cores
if memory_per_core is not None:
if mem is None:
mem = total_cores * memory_per_core
else:
if mem < total_cores * memory_per_core:
mem = total_cores * memory_per_core
vsc_kwargs = {
"cluster": cpu_cluster if not gpu else gpu_cluster,
"partition": cpu_part if not gpu else gpu_part,
"account": account,
"channel": channel,
"exclusive": False,
"cmd_timeout": 60,
"worker_init": worker_init,
"cores_per_node": total_cores,
"mem_per_node": mem,
"walltime": walltime,
"init_blocks": init_blocks,
"min_blocks": min_blocks,
"max_blocks": max_blocks,
"parallelism": parallelism,
# "label": label,
"nodes_per_block": 1,
}
if gpu:
vsc_kwargs[
"scheduler_options"
] = f"#SBATCH --gpus=1\n#SBATCH --cpus-per-gpu={cores}\n#SBATCH --export=None" # request gpu
provider = SlurmProviderVSC(**vsc_kwargs)
if use_work_queue:
worker_options = [
f"--cores={cores}",
f"--gpus={0 if not gpu else 1}",
]
if hasattr(provider, "walltime"):
walltime_hhmmss = provider.walltime.split(":")
assert len(walltime_hhmmss) == 3
walltime = 0.0
walltime += 3600 * float(walltime_hhmmss[0])
walltime += 60 * float(walltime_hhmmss[1])
walltime += float(walltime_hhmmss[2])
walltime -= 60 * 4 # add 4 minutes of slack
worker_options.append(f"--wall-time={walltime}")
worker_options.append(f"--timeout={wq_timeout}")
worker_options.append("--parent-death")
executor: ParslExecutor = WorkQueueExecutor(
label=label,
# env={"OMP_NUM_THREADS":f"open_mp_threads_per_core",},
working_dir=str(Path(path_internal) / label),
provider=provider,
shared_fs=True,
autocategory=False,
port=0,
max_retries=0,
worker_options=" ".join(worker_options),
)
else:
executor: ParslExecutor = HighThroughputExecutor(
label=label,
working_dir=str(Path(path_internal) / label),
cores_per_worker=cores,
provider=provider,
)
return executor
[docs]def config(
env=None,
singlepoint_nodes=16,
walltime="48:00:00",
bootstrap=False,
memory_per_core=None,
min_memery_per_node=None,
path_internal: Path | None = None,
cpu_cluster=None,
gpu_cluster=None,
py_env=None,
):
if env == "hortense":
if cpu_cluster is not None:
assert cpu_cluster in ["cpu_rome", "cpu_rome_512"]
if gpu_cluster is not None:
assert gpu_cluster in ["gpu_rome_a100"]
kw = {
"cpu_cluster": "dodrio",
"account": "2022_069",
"cpu_part": "cpu_rome" if cpu_cluster is None else cpu_cluster,
"gpu_part": "gpu_rome_a100" if gpu_cluster is None else gpu_cluster,
"path_internal": path_internal,
}
elif env == "stevin":
if cpu_cluster is not None:
assert cpu_cluster in [
"slaking",
"swalot",
"skitty",
"victini",
"kirlia",
"doduo",
"donphan",
"gallade",
]
if gpu_cluster is not None:
assert gpu_cluster in ["joltik", "accelgor"]
cpu = "doduo" if cpu_cluster is None else cpu_cluster
gpu = "accelgor" if gpu_cluster is None else gpu_cluster
kw = {
"cpu_cluster": cpu,
"cpu_part": cpu,
"gpu_cluster": gpu,
"gpu_part": gpu,
"path_internal": path_internal,
}
kw["env"] = env
kw["py_env"] = py_env
if bootstrap:
execs = [
get_slurm_provider(
**kw,
label="default",
init_blocks=1,
min_blocks=1,
max_blocks=1,
parallelism=0,
cores=1,
parsl_cores=True,
mem=10,
walltime="72:00:00",
),
]
else:
# general tasks
default = get_slurm_provider(
label="default",
init_blocks=1,
min_blocks=1,
max_blocks=512,
parallelism=1,
cores=4,
parsl_cores=True,
walltime="02:00:00",
**kw,
)
gpu_part = get_slurm_provider(
gpu=True,
label="training",
init_blocks=0,
min_blocks=0,
max_blocks=4,
parallelism=1,
cores=12,
parsl_cores=False,
walltime="02:00:00",
**kw,
)
reference = get_slurm_provider(
label="reference",
memory_per_core=memory_per_core,
mem=min_memery_per_node,
init_blocks=0,
min_blocks=0,
max_blocks=64,
parallelism=1,
cores=singlepoint_nodes,
parsl_cores=False,
walltime=walltime,
**kw,
)
execs = [default, gpu_part, reference]
return execs