@@ -31,6 +31,7 @@ class FluxPythonSpawner(BaseSpawner):
3131 threads_per_core (int, optional): The number of threads per base. Defaults to 1.
3232 gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
3333 num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
34+ worker_id (int): The worker ID. Defaults to 0.
3435 exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to
3536 False.
3637 openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
@@ -49,6 +50,7 @@ def __init__(
4950 threads_per_core : int = 1 ,
5051 gpus_per_core : int = 0 ,
5152 num_nodes : Optional [int ] = None ,
53+ worker_id : int = 0 ,
5254 exclusive : bool = False ,
5355 priority : Optional [int ] = None ,
5456 openmpi_oversubscribe : bool = False ,
@@ -60,6 +62,7 @@ def __init__(
6062 super ().__init__ (
6163 cwd = cwd ,
6264 cores = cores ,
65+ worker_id = worker_id ,
6366 openmpi_oversubscribe = openmpi_oversubscribe ,
6467 )
6568 self ._threads_per_core = threads_per_core
@@ -121,12 +124,13 @@ def bootup(
121124 if self ._cwd is not None :
122125 jobspec .cwd = self ._cwd
123126 os .makedirs (self ._cwd , exist_ok = True )
127+ file_prefix = "flux_" + str (self ._worker_id )
124128 if self ._flux_log_files and self ._cwd is not None :
125- jobspec .stderr = os .path .join (self ._cwd , "flux .err" )
126- jobspec .stdout = os .path .join (self ._cwd , "flux .out" )
129+ jobspec .stderr = os .path .join (self ._cwd , file_prefix + " .err" )
130+ jobspec .stdout = os .path .join (self ._cwd , file_prefix + " .out" )
127131 elif self ._flux_log_files :
128- jobspec .stderr = os .path .abspath ("flux .err" )
129- jobspec .stdout = os .path .abspath ("flux .out" )
132+ jobspec .stderr = os .path .abspath (file_prefix + " .err" )
133+ jobspec .stdout = os .path .abspath (file_prefix + " .out" )
130134 if self ._priority is not None :
131135 self ._future = self ._flux_executor .submit (
132136 jobspec = jobspec , urgency = self ._priority
0 commit comments