API Reference
This is the auto-generated API reference for the dagster-slurm
library.
It is generated directly from the docstrings in the Python source code.
Core
This section covers the main components of the library.
Dagster Slurm Integration.
Run Dagster assets on Slurm clusters with support for:
- Local dev mode (no SSH/Slurm)
- Per-asset Slurm submission (staging)
- Session mode with operator fusion (production)
- Multiple launchers (Bash, Ray, Spark)
class dagster_slurm.BashLauncher(**data)
Bases: ComputeLauncher
Executes Python scripts via bash.
Uses the self-contained pixi environment extracted at runtime. Sources the activation script provided by pixi-pack.
- Parameters:
data (
Any
)
prepare_execution(payload_path, python_executable, working_dir, pipes_context, extra_env=None, allocation_context=None, activation_script=None)
Generate bash execution plan.
- Parameters:
- payload_path (
str
) – Path to Python script on remote host - python_executable (
str
) – Python from extracted environment - working_dir (
str
) – Working directory - pipes_context (
Dict
[str
,str
]) – Dagster Pipes environment variables - extra_env (
Optional
[Dict
[str
,str
]]) – Additional environment variables - allocation_context (
Optional
[Dict
[str
,Any
]]) – Slurm allocation info (for session mode) - activation_script (
Optional
[str
]) – Path to activation script (provided by pixi-pack)
- payload_path (
- Return type:
ExecutionPlan
- Returns: ExecutionPlan with shell script
class dagster_slurm.ComputeResource(**data)
Bases: ConfigurableResource
Unified compute resource - adapts to deployment.
This is the main facade that assets depend on. Hides complexity of local vs Slurm vs session execution.
Usage: : @asset def my_asset(context: AssetExecutionContext, compute: ComputeResource):
return compute.run( : context=context, payload_path=”script.py”, launcher=RayLauncher(num_gpus_per_node=2)
)
Configuration Examples:
Local mode (dev): : compute = ComputeResource(mode=”local”)
Slurm per-asset mode (staging): : slurm = SlurmResource.from_env() compute = ComputeResource(mode=”slurm”, slurm=slurm)
Slurm session mode with cluster reuse (prod): : slurm = SlurmResource.from_env() session = SlurmSessionResource(slurm=slurm, num_nodes=10) compute = ComputeResource(
mode=”slurm-session”, slurm=slurm, session=session, enable_cluster_reuse=True, cluster_reuse_tolerance=0.2,
)
Heterogeneous job mode (optimal resource allocation): : compute = ComputeResource( : mode=”slurm-hetjob”, slurm=slurm,
)
- Parameters:
data (
Any
)
auto_detect_platform : bool
cleanup_on_failure : bool
cluster_reuse_tolerance : float
debug_mode : bool
default_launcher : Annotated
[Union
[ComputeLauncher
, PartialResource
]]
enable_cluster_reuse : bool
get_pipes_client(context, launcher=None)
Get appropriate Pipes client for this mode.
- Parameters:
- context (
InitResourceContext
) – Dagster resource context - launcher (
Optional
[ComputeLauncher
]) – Override launcher (uses default if None)
- context (
- Returns: LocalPipesClient or SlurmPipesClient
mode : ExecutionMode
model_post_init(context,)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
- self (
BaseModel
) – The BaseModel instance. - context (
Any
) – The context.
- self (
- Return type:
None
pack_platform : Optional
[str
]
pre_deployed_env_path : Optional
[str
]
register_cluster(cluster_address, framework, cpus, gpus, memory_gb)
Register a newly created cluster for future reuse.
- Parameters:
- cluster_address (
str
) – Address of the cluster (e.g., “10.0.0.1:6379”) - framework (
str
) – “ray” or “spark” - cpus (
int
) – Total CPUs in cluster - gpus (
int
) – Total GPUs in cluster - memory_gb (
int
) – Total memory in GB
- cluster_address (
run(context, payload_path, launcher=None, extra_slurm_opts=None, resource_requirements=None, **kwargs)
Execute asset with optional resource overrides.
- Parameters:
- context – Dagster execution context
- payload_path (
str
) – Path to Python script - launcher (
Optional
[ComputeLauncher
]) – Override launcher for this asset - extra_slurm_opts (
Optional
[Dict
[str
,Any
]]) – Override Slurm options (non-session mode)- nodes: int
- cpus_per_task: int
- mem: str (e.g., “32G”)
- gpus_per_node: int
- time_limit: str (e.g., “02:00:00”)
- resource_requirements (
Optional
[Dict
[str
,Any
]]) – Resource requirements for cluster reuse (session mode)- cpus: int
- gpus: int
- memory_gb: int
- framework: str (“ray” or “spark”)
- **kwargs – Passed to client.run()
- Yields: Dagster events
Examples
# Simple execution with default resources
yield from compute.run(context, "script.py")
# Override launcher for this asset
ray_launcher = RayLauncher(num_gpus_per_node=4)
yield from compute.run(context, "script.py", launcher=ray_launcher)
# Non-session mode: override Slurm resources
yield from compute.run(
context,
"script.py",
extra_slurm_opts={"nodes": 1, "cpus_per_task": 16, "mem": "64G", "gpus_per_node": 2}
)
# Session mode: specify resource requirements for cluster reuse
yield from compute.run(
context,
"script.py",
launcher=RayLauncher(num_gpus_per_node=2),
resource_requirements={"cpus": 32, "gpus": 2, "memory_gb": 128, "framework": "ray"}
)
run_hetjob(context, assets, launchers=None)
Run multiple assets as a heterogeneous Slurm job.
Submit all assets together with their specific resource requirements. Only waits in queue ONCE, but each asset gets the resources it needs.
- Parameters:
-
context – Dagster execution context
-
assets (
List
[Tuple
[str
,str
,Dict
[str
,Any
]]]) –List of (asset_key, payload_path, resource_requirements) resource_requirements:
- nodes: int (default: 1)
- cpus_per_task: int (default: 2)
- mem: str (default: “4G”)
- gpus_per_node: int (default: 0)
- time_limit: str (default: “01:00:00”)
-
launchers (
Optional
[Dict
[str
,ComputeLauncher
]]) – Optional dict mapping asset_key to ComputeLauncher
-
- Yields: Dagster events
Example
compute.run_hetjob(
context,
assets=[
("prep", "prep.py", {"nodes": 1, "cpus_per_task": 8, "mem": "32G"}),
("train", "train.py", {"nodes": 4, "cpus_per_task": 32, "mem": "128G", "gpus_per_node": 2}),
("infer", "infer.py", {"nodes": 8, "cpus_per_task": 16, "mem": "64G", "gpus_per_node": 1}),
],
launchers={
"train": RayLauncher(num_gpus_per_node=2),
"infer": RayLauncher(num_gpus_per_node=1),
}
)
session : Optional
[SlurmSessionResource
]
slurm : Optional
[SlurmResource
]
teardown(context)
Teardown method called by Dagster at end of run. Ensures session resources and clusters are cleaned up.
- Parameters:
context (
InitResourceContext
)
validate_configuration()
Validate configuration - runs during Pydantic validation.
- Return type:
ComputeResource
class dagster_slurm.LocalPipesClient(launcher, base_dir='/tmp/dagster_local_runs', require_pixi=True)
Bases: PipesClient
Pipes client for local execution (dev mode). No SSH, no Slurm - just runs scripts locally via subprocess.
- Parameters:
- launcher (
ComputeLauncher
) - base_dir (
str
) - require_pixi (
bool
)
- launcher (
run(context, , payload_path, python_executable=None, extra_env=None, extras=None, extra_slurm_opts=None)
Execute payload locally.
- Parameters:
- context (
AssetExecutionContext
) – Dagster execution context - payload_path (
str
) – Path to Python script to execute - python_executable (
Optional
[str
]) – Python interpreter (defaults to current) - extra_env (
Optional
[Dict
[str
,str
]]) – Additional environment variables - extras (
Optional
[Dict
[str
,Any
]]) – Extra data to pass via Pipes - extra_slurm_opts (
Optional
[Dict
[str
,Any
]])
- context (
- Yields: Dagster events (materializations, logs, etc.)
- Return type:
Iterator
class dagster_slurm.RayLauncher(**data)
Bases: ComputeLauncher
Ray distributed computing launcher.
Features:
- Robust cluster startup with sentinel-based shutdown
- Graceful cleanup on SIGTERM/SIGINT
- Worker registration monitoring
- Automatic head node detection
- IPv4/IPv6 normalization
Modes:
- Local: Single-node Ray
- Cluster: Multi-node Ray cluster across Slurm allocation (via allocation_context)
- Connect: Connect to existing cluster (via ray_address)
- Parameters:
data (
Any
)
dashboard_port : int
grace_period : int
head_startup_timeout : int
num_gpus_per_node : int
object_store_memory_gb : Optional
[int
]
prepare_execution(payload_path, python_executable, working_dir, pipes_context, extra_env=None, allocation_context=None, activation_script=None)
Generate Ray execution plan.
- Parameters:
- payload_path (
str
) - python_executable (
str
) - working_dir (
str
) - pipes_context (
Dict
[str
,str
]) - extra_env (
Optional
[Dict
[str
,str
]]) - allocation_context (
Optional
[Dict
[str
,Any
]]) - activation_script (
Optional
[str
])
- payload_path (
- Return type:
ExecutionPlan
ray_address : Optional
[str
]
ray_port : int
redis_password : Optional
[str
]
worker_startup_delay : int
class dagster_slurm.SSHConnectionResource(**data)
Bases: ConfigurableResource
SSH connection settings.
This resource configures a connection to a remote host via SSH. It supports key-based or password-based authentication, pseudo-terminal allocation (-t), and connections through a proxy jump host.
Supports two authentication methods:
- SSH key (recommended for automation)
- Password (for interactive use or when keys unavailable) Either key_path OR password must be provided (not both).
Examples
# Key-based auth
ssh = SSHConnectionResource(
host="cluster.example.com",
user="username",
key_path="~/.ssh/id_rsa",
)
# With a proxy jump host
jump_box = SSHConnectionResource(
host="jump.example.com", user="jumpuser", key_path="~/.ssh/jump_key"
)
ssh_via_jump = SSHConnectionResource(
host="private-cluster",
user="user_on_cluster",
key_path="~/.ssh/cluster_key",
jump_host=jump_box
)
# With a post-login command (e.g., for VSC)
vsc_ssh = SSHConnectionResource(
host="vmos.vsc.ac.at",
user="dagster01",
key_path="~/.ssh/vsc_key",
force_tty=True,
post_login_command="vsc5"
)
# From environment variables
ssh = SSHConnectionResource.from_env()
- Parameters:
data (
Any
)
extra_opts : List
[str
]
force_tty : bool
classmethod from_env(prefix='SLURM_SSH', _is_jump=False)
Create from environment variables.
This method reads connection details from environment variables. The variable
names are constructed using the provided prefix
.
With the default prefix, the following variables are used:
SLURM_SSH_HOST
- SSH hostname (required)SLURM_SSH_PORT
- SSH port (optional, default: 22)SLURM_SSH_USER
- SSH username (required)SLURM_SSH_KEY
- Path to SSH key (optional)SLURM_SSH_PASSWORD
- SSH password (optional)SLURM_SSH_FORCE_TTY
- Set to ‘true’ or ‘1’ to enable tty allocation (optional)SLURM_SSH_POST_LOGIN_COMMAND
- Post-login command string (optional)SLURM_SSH_OPTS_EXTRA
- Additional SSH options (optional)
For proxy jumps, use the _JUMP
suffix for jump host variables (e.g.,
SLURM_SSH_JUMP_HOST
, SLURM_SSH_JUMP_USER
, etc.).
- Parameters:
- prefix (
str
) – Environment variable prefix (default: “SLURM_SSH”) - _is_jump (
bool
)
- prefix (
- Return type:
SSHConnectionResource
- Returns: SSHConnectionResource instance
get_proxy_command_opts()
Builds SSH options for ProxyCommand if a jump_host is configured.
- Return type:
List
[str
]
get_remote_target()
Get the remote target string for SCP commands.
- Return type:
str
get_scp_base_command()
Build base SCP command, including proxy and auth options.
- Return type:
List
[str
]
get_ssh_base_command()
Build base SSH command, including proxy and auth options.
- Return type:
List
[str
]
host : str
jump_host : Optional
[SSHConnectionResource]
key_path : Optional
[str
]
password : Optional
[str
]
port : int
post_login_command : Optional
[str
]
user : str
property uses_key_auth : bool
Returns True if using key-based authentication.
property uses_password_auth : bool
Returns True if using password-based authentication.
class dagster_slurm.SlurmAllocation(slurm_job_id, nodes, working_dir, config)
Bases: object
Represents a running Slurm allocation.
- Parameters:
- slurm_job_id (
int
) - nodes (
List
[str
]) - working_dir (
str
) - config (
SlurmSessionResource
)
- slurm_job_id (
cancel(ssh_pool)
Cancel the allocation.
- Parameters:
ssh_pool (
SSHConnectionPool
)
execute(execution_plan, asset_key, ssh_pool)
Execute plan in this allocation via srun.
- Parameters:
- execution_plan (
ExecutionPlan
) - asset_key (
str
) - ssh_pool (
SSHConnectionPool
)
- execution_plan (
- Return type:
int
get_failed_nodes()
Get list of failed nodes.
- Return type:
List
[str
]
is_healthy(ssh_pool)
Check if allocation and nodes are healthy.
- Parameters:
ssh_pool (
SSHConnectionPool
) - Return type:
bool
class dagster_slurm.SlurmPipesClient(slurm_resource, launcher, session_resource=None, cleanup_on_failure=True, debug_mode=False, auto_detect_platform=True, pack_platform=None, pre_deployed_env_path=None)
Bases: PipesClient
Pipes client for Slurm execution with real-time log streaming and cancellation support.
Features:
- Real-time stdout/stderr streaming to Dagster logs
- Packaging environment with pixi pack
- Auto-reconnect message reading
- Metrics collection
- Graceful cancellation with Slurm job termination
Works in two modes:
- Standalone: Each asset = separate sbatch job
- Session: Multiple assets share a Slurm allocation (operator fusion)
- Parameters:
- slurm_resource (
SlurmResource
) - launcher (
ComputeLauncher
) - session_resource (
Optional
[SlurmSessionResource
]) - cleanup_on_failure (
bool
) - debug_mode (
bool
) - auto_detect_platform (
bool
) - pack_platform (
Optional
[str
]) - pre_deployed_env_path (
Optional
[str
])
- slurm_resource (
run(context, , payload_path, extra_env=None, extras=None, use_session=False, extra_slurm_opts=None, **kwargs)
Execute payload on Slurm cluster with real-time log streaming.
- Parameters:
- context (
AssetExecutionContext
) – Dagster execution context - payload_path (
str
) – Local path to Python script - launcher – Ignored (launcher is set at client construction time)
- extra_env (
Optional
[Dict
[str
,str
]]) – Additional environment variables - extras (
Optional
[Dict
[str
,Any
]]) – Extra data to pass via Pipes - use_session (
bool
) – If True and session_resource provided, use shared allocation - extra_slurm_opts (
Optional
[Dict
[str
,Any
]]) – Override Slurm options (non-session mode) - **kwargs – Additional arguments (ignored, for forward compatibility)
- context (
- Yields: Dagster events
- Return type:
Iterator
class dagster_slurm.SlurmQueueConfig(**data)
Bases: ConfigurableResource
Default Slurm job submission parameters. These can be overridden per-asset via metadata or function arguments.
- Parameters:
data (
Any
)
cpus : int
gpus_per_node : int
mem : str
mem_per_cpu : str
num_nodes : int
partition : str
time_limit : str
class dagster_slurm.SlurmResource(**data)
Bases: ConfigurableResource
Complete Slurm cluster configuration. Combines SSH connection, queue defaults, and cluster-specific paths.
- Parameters:
data (
Any
)
classmethod from_env()
Create from environment variables.
- Return type:
SlurmResource
classmethod from_env_slurm(ssh)
Create a SlurmResource by populating most fields from environment variables, but requires an explicit, pre-configured SSHConnectionResource to be provided.
- Parameters:
ssh (
SSHConnectionResource
) – A fully configured SSHConnectionResource instance. - Return type:
SlurmResource
queue : Annotated
[Union
[SlurmQueueConfig
, PartialResource
]]
remote_base : Optional
[str
]
ssh : Annotated
[Union
[SSHConnectionResource
, PartialResource
]]
class dagster_slurm.SlurmSessionResource(**data)
Bases: ConfigurableResource
Slurm session resource for operator fusion.
This is a proper Dagster resource that manages the lifecycle of a Slurm allocation across multiple assets in a run.
Usage in definitions.py: : session = SlurmSessionResource( : slurm=slurm, num_nodes=4, time_limit=”04:00:00”,
)
- Parameters:
data (
Any
)
enable_health_checks : bool
enable_session : bool
execute_in_session(execution_plan, asset_key)
Execute workload in the shared allocation. Thread-safe for parallel asset execution.
- Parameters:
- execution_plan (
ExecutionPlan
) - asset_key (
str
)
- execution_plan (
- Return type:
int
max_concurrent_jobs : int
model_post_init(context,)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
- self (
BaseModel
) – The BaseModel instance. - context (
Any
) – The context.
- self (
- Return type:
None
num_nodes : int
partition : Optional
[str
]
setup_for_execution(context)
Called by Dagster when resource is initialized for a run. This is the proper Dagster resource lifecycle hook.
- Parameters:
context (
InitResourceContext
) - Return type:
SlurmSessionResource
slurm : SlurmResource
teardown_after_execution(context)
Called by Dagster when resource is torn down after run completion. This is the proper Dagster resource lifecycle hook.
- Parameters:
context (
InitResourceContext
) - Return type:
None
time_limit : str
class dagster_slurm.SparkLauncher(**data)
Bases: ComputeLauncher
Apache Spark launcher.
Modes:
- Local: Single-node Spark (no allocation_context)
- Cluster: Spark cluster across Slurm allocation (via allocation_context)
- Standalone: Connect to existing Spark cluster (via master_url)
- Parameters:
data (
Any
)
driver_memory : str
executor_cores : int
executor_memory : str
master_url : Optional
[str
]
num_executors : Optional
[int
]
prepare_execution(payload_path, python_executable, working_dir, pipes_context, extra_env=None, allocation_context=None, activation_script=None)
Generate Spark execution plan.
- Parameters:
- payload_path (
str
) - python_executable (
str
) - working_dir (
str
) - pipes_context (
Dict
[str
,str
]) - extra_env (
Optional
[Dict
[str
,str
]]) - allocation_context (
Optional
[Dict
[str
,Any
]]) - activation_script (
Optional
[str
])
- payload_path (
- Return type:
ExecutionPlan
spark_home : str
class dagster_slurm.ComputeResource(**data)
Bases: ConfigurableResource
Unified compute resource - adapts to deployment.
This is the main facade that assets depend on. Hides complexity of local vs Slurm vs session execution.
Usage: : @asset def my_asset(context: AssetExecutionContext, compute: ComputeResource):
return compute.run( : context=context, payload_path=”script.py”, launcher=RayLauncher(num_gpus_per_node=2)
)
Configuration Examples:
Local mode (dev): : compute = ComputeResource(mode=”local”)
Slurm per-asset mode (staging): : slurm = SlurmResource.from_env() compute = ComputeResource(mode=”slurm”, slurm=slurm)
Slurm session mode with cluster reuse (prod): : slurm = SlurmResource.from_env() session = SlurmSessionResource(slurm=slurm, num_nodes=10) compute = ComputeResource(
mode=”slurm-session”, slurm=slurm, session=session, enable_cluster_reuse=True, cluster_reuse_tolerance=0.2,
)
Heterogeneous job mode (optimal resource allocation): : compute = ComputeResource( : mode=”slurm-hetjob”, slurm=slurm,
)
- Parameters:
data (
Any
)
auto_detect_platform : bool
cleanup_on_failure : bool
cluster_reuse_tolerance : float
debug_mode : bool
default_launcher : Annotated
[Union
[ComputeLauncher
, PartialResource
]]
enable_cluster_reuse : bool
get_pipes_client(context, launcher=None)
Get appropriate Pipes client for this mode.
- Parameters:
- context (
InitResourceContext
) – Dagster resource context - launcher (
Optional
[ComputeLauncher
]) – Override launcher (uses default if None)
- context (
- Returns: LocalPipesClient or SlurmPipesClient
mode : ExecutionMode
model_post_init(context,)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
- self (
BaseModel
) – The BaseModel instance. - context (
Any
) – The context.
- self (
- Return type:
None
pack_platform : Optional
[str
]
pre_deployed_env_path : Optional
[str
]
register_cluster(cluster_address, framework, cpus, gpus, memory_gb)
Register a newly created cluster for future reuse.
- Parameters:
- cluster_address (
str
) – Address of the cluster (e.g., “10.0.0.1:6379”) - framework (
str
) – “ray” or “spark” - cpus (
int
) – Total CPUs in cluster - gpus (
int
) – Total GPUs in cluster - memory_gb (
int
) – Total memory in GB
- cluster_address (
run(context, payload_path, launcher=None, extra_slurm_opts=None, resource_requirements=None, **kwargs)
Execute asset with optional resource overrides.
- Parameters:
- context – Dagster execution context
- payload_path (
str
) – Path to Python script - launcher (
Optional
[ComputeLauncher
]) – Override launcher for this asset - extra_slurm_opts (
Optional
[Dict
[str
,Any
]]) – Override Slurm options (non-session mode)- nodes: int
- cpus_per_task: int
- mem: str (e.g., “32G”)
- gpus_per_node: int
- time_limit: str (e.g., “02:00:00”)
- resource_requirements (
Optional
[Dict
[str
,Any
]]) – Resource requirements for cluster reuse (session mode)- cpus: int
- gpus: int
- memory_gb: int
- framework: str (“ray” or “spark”)
- **kwargs – Passed to client.run()
- Yields: Dagster events
Examples
# Simple execution with default resources
yield from compute.run(context, "script.py")
# Override launcher for this asset
ray_launcher = RayLauncher(num_gpus_per_node=4)
yield from compute.run(context, "script.py", launcher=ray_launcher)
# Non-session mode: override Slurm resources
yield from compute.run(
context,
"script.py",
extra_slurm_opts={"nodes": 1, "cpus_per_task": 16, "mem": "64G", "gpus_per_node": 2}
)
# Session mode: specify resource requirements for cluster reuse
yield from compute.run(
context,
"script.py",
launcher=RayLauncher(num_gpus_per_node=2),
resource_requirements={"cpus": 32, "gpus": 2, "memory_gb": 128, "framework": "ray"}
)
run_hetjob(context, assets, launchers=None)
Run multiple assets as a heterogeneous Slurm job.
Submit all assets together with their specific resource requirements. Only waits in queue ONCE, but each asset gets the resources it needs.
- Parameters:
-
context – Dagster execution context
-
assets (
List
[Tuple
[str
,str
,Dict
[str
,Any
]]]) –List of (asset_key, payload_path, resource_requirements) resource_requirements:
- nodes: int (default: 1)
- cpus_per_task: int (default: 2)
- mem: str (default: “4G”)
- gpus_per_node: int (default: 0)
- time_limit: str (default: “01:00:00”)
-
launchers (
Optional
[Dict
[str
,ComputeLauncher
]]) – Optional dict mapping asset_key to ComputeLauncher
-
- Yields: Dagster events
Example
compute.run_hetjob(
context,
assets=[
("prep", "prep.py", {"nodes": 1, "cpus_per_task": 8, "mem": "32G"}),
("train", "train.py", {"nodes": 4, "cpus_per_task": 32, "mem": "128G", "gpus_per_node": 2}),
("infer", "infer.py", {"nodes": 8, "cpus_per_task": 16, "mem": "64G", "gpus_per_node": 1}),
],
launchers={
"train": RayLauncher(num_gpus_per_node=2),
"infer": RayLauncher(num_gpus_per_node=1),
}
)
session : Optional
[SlurmSessionResource
]
slurm : Optional
[SlurmResource
]
teardown(context)
Teardown method called by Dagster at end of run. Ensures session resources and clusters are cleaned up.
- Parameters:
context (
InitResourceContext
)
validate_configuration()
Validate configuration - runs during Pydantic validation.
- Return type:
ComputeResource
class dagster_slurm.SlurmResource(**data)
Bases: ConfigurableResource
Complete Slurm cluster configuration. Combines SSH connection, queue defaults, and cluster-specific paths.
- Parameters:
data (
Any
)
classmethod from_env()
Create from environment variables.
- Return type:
SlurmResource
classmethod from_env_slurm(ssh)
Create a SlurmResource by populating most fields from environment variables, but requires an explicit, pre-configured SSHConnectionResource to be provided.
- Parameters:
ssh (
SSHConnectionResource
) – A fully configured SSHConnectionResource instance. - Return type:
SlurmResource
queue : Annotated
[Union
[SlurmQueueConfig
, PartialResource
]]
remote_base : Optional
[str
]
ssh : Annotated
[Union
[SSHConnectionResource
, PartialResource
]]
class dagster_slurm.SSHConnectionResource(**data)
Bases: ConfigurableResource
SSH connection settings.
This resource configures a connection to a remote host via SSH. It supports key-based or password-based authentication, pseudo-terminal allocation (-t), and connections through a proxy jump host.
Supports two authentication methods:
- SSH key (recommended for automation)
- Password (for interactive use or when keys unavailable) Either key_path OR password must be provided (not both).
Examples
# Key-based auth
ssh = SSHConnectionResource(
host="cluster.example.com",
user="username",
key_path="~/.ssh/id_rsa",
)
# With a proxy jump host
jump_box = SSHConnectionResource(
host="jump.example.com", user="jumpuser", key_path="~/.ssh/jump_key"
)
ssh_via_jump = SSHConnectionResource(
host="private-cluster",
user="user_on_cluster",
key_path="~/.ssh/cluster_key",
jump_host=jump_box
)
# With a post-login command (e.g., for VSC)
vsc_ssh = SSHConnectionResource(
host="vmos.vsc.ac.at",
user="dagster01",
key_path="~/.ssh/vsc_key",
force_tty=True,
post_login_command="vsc5"
)
# From environment variables
ssh = SSHConnectionResource.from_env()
- Parameters:
data (
Any
)
extra_opts : List
[str
]
force_tty : bool
classmethod from_env(prefix='SLURM_SSH', _is_jump=False)
Create from environment variables.
This method reads connection details from environment variables. The variable
names are constructed using the provided prefix
.
With the default prefix, the following variables are used:
SLURM_SSH_HOST
- SSH hostname (required)SLURM_SSH_PORT
- SSH port (optional, default: 22)SLURM_SSH_USER
- SSH username (required)SLURM_SSH_KEY
- Path to SSH key (optional)SLURM_SSH_PASSWORD
- SSH password (optional)SLURM_SSH_FORCE_TTY
- Set to ‘true’ or ‘1’ to enable tty allocation (optional)SLURM_SSH_POST_LOGIN_COMMAND
- Post-login command string (optional)SLURM_SSH_OPTS_EXTRA
- Additional SSH options (optional)
For proxy jumps, use the _JUMP
suffix for jump host variables (e.g.,
SLURM_SSH_JUMP_HOST
, SLURM_SSH_JUMP_USER
, etc.).
- Parameters:
- prefix (
str
) – Environment variable prefix (default: “SLURM_SSH”) - _is_jump (
bool
)
- prefix (
- Return type:
SSHConnectionResource
- Returns: SSHConnectionResource instance
get_proxy_command_opts()
Builds SSH options for ProxyCommand if a jump_host is configured.
- Return type:
List
[str
]
get_remote_target()
Get the remote target string for SCP commands.
- Return type:
str
get_scp_base_command()
Build base SCP command, including proxy and auth options.
- Return type:
List
[str
]
get_ssh_base_command()
Build base SSH command, including proxy and auth options.
- Return type:
List
[str
]
host : str
jump_host : Optional
[SSHConnectionResource]
key_path : Optional
[str
]
password : Optional
[str
]
port : int
post_login_command : Optional
[str
]
user : str
property uses_key_auth : bool
Returns True if using key-based authentication.
property uses_password_auth : bool
Returns True if using password-based authentication.
class dagster_slurm.SlurmSessionResource(**data)
Bases: ConfigurableResource
Slurm session resource for operator fusion.
This is a proper Dagster resource that manages the lifecycle of a Slurm allocation across multiple assets in a run.
Usage in definitions.py: : session = SlurmSessionResource( : slurm=slurm, num_nodes=4, time_limit=”04:00:00”,
)
- Parameters:
data (
Any
)
enable_health_checks : bool
enable_session : bool
execute_in_session(execution_plan, asset_key)
Execute workload in the shared allocation. Thread-safe for parallel asset execution.
- Parameters:
- execution_plan (
ExecutionPlan
) - asset_key (
str
)
- execution_plan (
- Return type:
int
max_concurrent_jobs : int
model_post_init(context,)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
- self (
BaseModel
) – The BaseModel instance. - context (
Any
) – The context.
- self (
- Return type:
None
num_nodes : int
partition : Optional
[str
]
setup_for_execution(context)
Called by Dagster when resource is initialized for a run. This is the proper Dagster resource lifecycle hook.
- Parameters:
context (
InitResourceContext
) - Return type:
SlurmSessionResource
slurm : SlurmResource
teardown_after_execution(context)
Called by Dagster when resource is torn down after run completion. This is the proper Dagster resource lifecycle hook.
- Parameters:
context (
InitResourceContext
) - Return type:
None
time_limit : str
class dagster_slurm.SlurmQueueConfig(**data)
Bases: ConfigurableResource
Default Slurm job submission parameters. These can be overridden per-asset via metadata or function arguments.
- Parameters:
data (
Any
)