Writing Custom Runners
While the official runners cover most common use cases, you might need to create your own custom runner for specific requirements - like running commands on a remote cluster, implementing custom logging, or handling specialized file systems.
Understanding the Runner Protocol
Custom runners need to implement two main protocols: Runner
and Execution
. Think of the Runner
as a factory that creates Execution
objects, and each Execution
handles a single command run.
The Runner Protocol
Your runner class needs to implement just one method:
class MyRunner:
def start_execution(self, metadata: Metadata) -> Execution:
"""Start an execution for a specific tool.
Args:
metadata: Information about the tool being executed (name, package, etc.)
Returns:
An Execution object that will handle the actual command execution
"""
The Execution Protocol
The Execution
object does the heavy lifting with four key methods:
class MyExecution:
def input_file(self, host_file: InputPathType, resolve_parent: bool = False, mutable: bool = False) -> str:
"""Handle input files - return where the command should find them"""
def output_file(self, local_file: str, optional: bool = False) -> OutputPathType:
"""Handle output files - return where they'll be stored on the host"""
def params(self, params: dict) -> dict:
"""Process or modify command parameters if needed"""
def run(self, cargs: list[str], handle_stdout=None, handle_stderr=None) -> None:
"""Actually execute the command"""
Example: Simple Custom Runner
Let's build a custom runner that adds some logging and stores outputs in timestamped directories:
import pathlib
import subprocess
import logging
from datetime import datetime
from styxdefs import Runner, Execution, Metadata, InputPathType, OutputPathType
class TimestampedExecution(Execution):
def __init__(self, output_dir: pathlib.Path, metadata: Metadata):
self.output_dir = output_dir
self.metadata = metadata
self.logger = logging.getLogger(f"custom_runner.{metadata.name}")
# Create output directory
self.output_dir.mkdir(parents=True, exist_ok=True)
def input_file(self, host_file: InputPathType, resolve_parent: bool = False, mutable: bool = False) -> str:
# For simplicity, just return the absolute path
return str(pathlib.Path(host_file).absolute())
def output_file(self, local_file: str, optional: bool = False) -> OutputPathType:
return self.output_dir / local_file
def params(self, params: dict) -> dict:
# Log parameters for debugging
self.logger.info(f"Running {self.metadata.name} with params: {params}")
return params
def run(self, cargs: list[str], handle_stdout=None, handle_stderr=None) -> None:
self.logger.info(f"Executing: {' '.join(cargs)}")
# Run the command
result = subprocess.run(
cargs,
cwd=self.output_dir,
capture_output=True,
text=True
)
# Handle output
if handle_stdout and result.stdout:
for line in result.stdout.splitlines():
handle_stdout(line)
if handle_stderr and result.stderr:
for line in result.stderr.splitlines():
handle_stderr(line)
if result.returncode != 0:
raise RuntimeError(f"Command failed with return code {result.returncode}")
class TimestampedRunner(Runner):
def __init__(self, base_dir: str = "custom_outputs"):
self.base_dir = pathlib.Path(base_dir)
self.execution_counter = 0
def start_execution(self, metadata: Metadata) -> Execution:
# Create timestamped directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = self.base_dir / f"{timestamp}_{self.execution_counter}_{metadata.name}"
self.execution_counter += 1
return TimestampedExecution(output_dir, metadata)
# Usage
from styxdefs import set_global_runner
custom_runner = TimestampedRunner(base_dir="my_custom_outputs")
set_global_runner(custom_runner)
Learning from Official Runners
The best way to understand custom runners is to look at how the official ones work:
LocalRunner Pattern
The LocalRunner
is the simplest - it just runs commands directly on the host system. Notice how it:
- Creates unique output directories using a hash and counter
- Uses
subprocess.Popen
with threading for real-time output handling - Handles file paths by converting them to absolute paths
DockerRunner Pattern
The DockerRunner
shows more complex file handling:
- Maps host files to container paths using Docker mounts
- Creates a run script inside the container
- Handles the complexity of translating between host and container file systems
Advanced Patterns
Middleware Runners
You can create runners that wrap other runners to add functionality:
class LoggingRunner(Runner):
def __init__(self, wrapped_runner: Runner):
self.wrapped_runner = wrapped_runner
self.logger = logging.getLogger("logging_runner")
def start_execution(self, metadata: Metadata) -> Execution:
self.logger.info(f"Starting execution of {metadata.name}")
execution = self.wrapped_runner.start_execution(metadata)
return LoggingExecution(execution, self.logger)
class LoggingExecution(Execution):
def __init__(self, wrapped_execution: Execution, logger):
self.wrapped = wrapped_execution
self.logger = logger
def run(self, cargs: list[str], handle_stdout=None, handle_stderr=None) -> None:
self.logger.info(f"Running command: {' '.join(cargs)}")
start_time = datetime.now()
try:
self.wrapped.run(cargs, handle_stdout, handle_stderr)
duration = datetime.now() - start_time
self.logger.info(f"Command completed in {duration}")
except Exception as e:
self.logger.error(f"Command failed: {e}")
raise
# Forward other methods to wrapped execution
def input_file(self, *args, **kwargs): return self.wrapped.input_file(*args, **kwargs)
def output_file(self, *args, **kwargs): return self.wrapped.output_file(*args, **kwargs)
def params(self, *args, **kwargs): return self.wrapped.params(*args, **kwargs)
Remote Execution
For cluster or remote execution, you might implement SSH-based runners, SLURM job submission, or cloud-based execution.
Tips for Custom Runners
File Handling: Think carefully about where files live and how paths get translated. Input files need to be accessible to your execution environment, and output files need to end up where the user expects them.
Error Handling: Always check return codes and provide meaningful error messages. Consider implementing custom exception types like StyxDockerError
for better debugging.
Logging: Good logging makes debugging much easier. Use structured logging with appropriate levels.
Threading: If you need real-time output handling (like showing progress), consider using threading like the official runners do.
Testing: Test your runner with various tools and edge cases. The DryRunner
pattern is useful for testing command generation without actual execution.
tip
Start simple! Create a basic working runner first, then add advanced features like custom file handling or remote execution once you have the basics working.
tip
Look at the source code of official runners for inspiration. They handle many edge cases you might not think of initially.