Workflows and orchestration¶
WorkflowFunction wraps every op and every user-written
@workflow_function. Module is the stateful container with
@workflow_method children.
Prefect orchestration is off by default. Set
prefect_config.workflow_kind = WorkflowKind.TASK (or FLOW) globally, or
export PROBPIPE_WORKFLOW_KIND=task in the environment.
Wrappers and decorators¶
WorkflowFunction(*, func, workflow_kind=WorkflowKind.DEFAULT, name=None, bind=None, module=None, n_broadcast_samples=None, vectorize='auto', parallel=False, seed=0, include_inputs=False, **kwargs)
¶
Bases: Node
A single executable DAG node wrapping exactly one function.
Infers dependency-vs-input from the function signature and type hints. Optionally resolves missing values from an attached Module.
Broadcasting: When a Distribution is passed for an argument whose
type hint is not a Distribution subclass, the workflow automatically
samples from the distribution and calls the wrapped function once per
sample, returning an EmpiricalDistribution over the outputs (or a
plain list when results are not numeric).
Vectorization and orchestration are orthogonal concerns:
- Vectorization (
vectorize) controls how samples are dispatched:jax.vmapfor JAX-traceable functions, or a Python loop otherwise. - Orchestration (
workflow_kind) controls whether the dispatch is wrapped in a Prefect task or flow for compute-graph tracing.
When both are active, the JAX-vectorized computation is executed inside
a Prefect task/flow, giving the benefits of vmap performance with
full Prefect lineage tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
The function to wrap. |
required |
workflow_kind
|
WorkflowKind
|
Prefect orchestration mode. |
DEFAULT
|
name
|
str or None
|
Display name; defaults to |
None
|
bind
|
dict or None
|
Construction-time keyword bindings (defaults / config). |
None
|
module
|
Module or None
|
Parent module for input / dependency resolution. |
None
|
n_broadcast_samples
|
int
|
Default number of samples drawn when broadcasting. Can be overridden
at call time by passing |
None
|
vectorize
|
str
|
Vectorization strategy for broadcasting:
|
'auto'
|
parallel
|
bool or int
|
Controls parallel execution during broadcasting ( |
False
|
seed
|
int
|
Random seed for JAX PRNG key management during broadcasting. |
0
|
Source code in probpipe/core/node.py
effective_workflow_kind
property
¶
Resolve the orchestration mode for this instance.
Resolution order:
- Per-instance override (anything other than
DEFAULT). - Global
prefect_config.workflow_kind. - If global is also
DEFAULT, fall back toOFF. Prefect orchestration is opt-in: set the global or per-instanceworkflow_kindtoTASK/FLOW, or exportPROBPIPE_WORKFLOW_KIND=taskin the environment.
If Prefect is not installed but TASK or FLOW is requested
(either per-instance or globally), a warning is emitted and the
mode falls back to OFF.
Module(*, workflow_kind=WorkflowKind.DEFAULT, **kwargs)
¶
Bases: Node
Container for workflow nodes with shared inputs and child nodes.
New user-facing API: MyModule(data=data_node, horizon=30, alpha=0.1)
Internally: - kwargs whose values are Node instances become child_nodes - everything else becomes inputs
Source code in probpipe/core/node.py
dag()
¶
Return a Graphviz DAG visualization of this module.
Source code in probpipe/core/node.py
1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 | |
workflow_function(_func=None, /, **kwargs)
¶
Decorator to create a WorkflowFunction from a plain function.
Can be used with or without arguments::
@workflow_function
def my_func(x, y):
return x + y
@workflow_function(n_broadcast_samples=100, vectorize="loop")
def my_func(x, y):
return x + y
Source code in probpipe/core/node.py
workflow_method(func)
¶
Mark a method as a workflow method for Module subclasses.
Methods decorated with @workflow_method are automatically
converted to WorkflowFunction instances when the
Module is instantiated.
Source code in probpipe/core/node.py
abstract_workflow_method(func)
¶
Mark a method as an abstract workflow interface.
Combines @abstractmethod with @workflow_method so that
AbstractModule subclasses can declare workflow-shaped
interfaces without providing implementations.
Source code in probpipe/core/node.py
Orchestration configuration¶
WorkflowKind
¶
Bases: Enum
Orchestration mode for WorkflowFunction instances.
Members
DEFAULT
Inherit from global config; the shipped global default is
OFF unless overridden via PROBPIPE_WORKFLOW_KIND or
explicit assignment to prefect_config.workflow_kind. At
the per-instance level, DEFAULT means "inherit from
global config".
OFF
No Prefect orchestration. Plain Python execution.
TASK
Wrap execution in a Prefect task (via task.map()).
Raises ImportError if Prefect is not installed.
FLOW
Wrap execution in a Prefect flow.
Raises ImportError if Prefect is not installed.
prefect_config = PrefectConfig()
module-attribute
¶
PROBPIPE_WORKFLOW_KIND environment variable¶
PROBPIPE_WORKFLOW_KIND (case-insensitive: off / task / flow /
default) sets the initial prefect_config.workflow_kind at import time.
Unknown values raise ValueError. prefect_config.reset() re-reads the
variable.