Skip to content

Provenance

Every Distribution or Record returned by a workflow function carries a Provenance record linking it to its inputs and the op that produced it. The result is a directed acyclic graph: each node is a value, each edge points from a value to one of its inputs.

provenance_ancestors(value) returns the transitive set of values that went into producing value. provenance_dag(value) returns the same information as a dict describing the full DAG — useful for debugging or for rendering the lineage with graphviz.

Provenance(operation, parents=(), metadata=dict()) dataclass

Tracks how a distribution was created.

to_dict(*, recurse=True)

Serialize to a JSON-compatible dict.

Parameters:

Name Type Description Default
recurse bool

If True, recursively serialize parent provenance chains. If False, only include parent type/name references.

True
Source code in probpipe/core/provenance.py
def to_dict(self, *, recurse: bool = True) -> dict[str, Any]:
    """Serialize to a JSON-compatible dict.

    Parameters
    ----------
    recurse : bool
        If True, recursively serialize parent provenance chains.
        If False, only include parent type/name references.
    """
    parent_dicts = []
    for p in self.parents:
        entry: dict[str, Any] = {
            "type": type(p).__name__,
            "name": p.name,
        }
        if recurse and p.source is not None:
            entry["source"] = p.source.to_dict(recurse=True)
        parent_dicts.append(entry)

    # Filter metadata to JSON-serializable values
    safe_metadata = {}
    for k, v in self.metadata.items():
        if isinstance(v, (str, int, float, bool, list, dict, type(None))):
            safe_metadata[k] = v
        else:
            safe_metadata[k] = str(v)

    return {
        "operation": self.operation,
        "parents": parent_dicts,
        "metadata": safe_metadata,
    }

from_dict(d) classmethod

Reconstruct from a dict produced by to_dict.

Parent distributions are not available at deserialization time, so parents will be an empty tuple. The parent information is preserved in the dict under "parents" for inspection.

Source code in probpipe/core/provenance.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Provenance:
    """Reconstruct from a dict produced by :meth:`to_dict`.

    Parent distributions are not available at deserialization time, so
    ``parents`` will be an empty tuple.  The parent information is
    preserved in the dict under ``"parents"`` for inspection.
    """
    return cls(
        operation=d["operation"],
        parents=(),
        metadata={**d.get("metadata", {}), "_parents_info": d.get("parents", [])},
    )

provenance_ancestors(node)

Return all ancestor nodes reachable via provenance chains.

Traverses node.source.parents recursively (breadth-first) and returns a flat list of unique ancestors, ordered by discovery. The input node is not included in the result.

Parameters:

Name Type Description Default
node Distribution | Record | RecordArray

Any object exposing a .source attribute. The three ProbPipe types that carry provenance satisfy this uniformly.

required
Source code in probpipe/core/provenance.py
def provenance_ancestors(node: "ProvenanceNode") -> list["ProvenanceNode"]:
    """Return all ancestor nodes reachable via provenance chains.

    Traverses ``node.source.parents`` recursively (breadth-first) and
    returns a flat list of unique ancestors, ordered by discovery.
    The input *node* is **not** included in the result.

    Parameters
    ----------
    node : Distribution | Record | RecordArray
        Any object exposing a ``.source`` attribute. The three ProbPipe
        types that carry provenance satisfy this uniformly.
    """
    visited: set[int] = {id(node)}
    ancestors: list = []
    queue: list = []

    if node.source is not None:
        for p in node.source.parents:
            if id(p) not in visited:
                visited.add(id(p))
                queue.append(p)
                ancestors.append(p)

    while queue:
        current = queue.pop(0)
        if current.source is not None:
            for p in current.source.parents:
                if id(p) not in visited:
                    visited.add(id(p))
                    queue.append(p)
                    ancestors.append(p)

    return ancestors

provenance_dag(dist)

Build a Graphviz Digraph of the provenance chain rooted at dist.

Each node is a distribution (labelled with type and name). Edges point from parent to child and are labelled with the operation that produced the child.

Requires the graphviz package. Returns a graphviz.Digraph instance that can be rendered or displayed in a notebook.

Source code in probpipe/core/provenance.py
def provenance_dag(dist: Distribution):
    """Build a Graphviz ``Digraph`` of the provenance chain rooted at *dist*.

    Each node is a distribution (labelled with type and name).  Edges point
    from parent to child and are labelled with the operation that produced
    the child.

    Requires the ``graphviz`` package.  Returns a ``graphviz.Digraph``
    instance that can be rendered or displayed in a notebook.
    """
    try:
        from graphviz import Digraph
    except ImportError:
        raise ImportError(
            "graphviz is required for provenance_dag(). "
            "Install it with: pip install graphviz"
        )

    dot = Digraph(comment="Provenance DAG")
    dot.attr(rankdir="BT")  # bottom-to-top: parents below children

    visited: set[int] = set()

    def _label(d: Distribution) -> str:
        name = d.name or ""
        typename = type(d).__name__
        if name:
            return f"{typename}\n'{name}'"
        return typename

    def _visit(d: Distribution) -> str:
        nid = str(id(d))
        if id(d) in visited:
            return nid
        visited.add(id(d))
        dot.node(nid, _label(d))

        if d.source is not None:
            for p in d.source.parents:
                pid = _visit(p)
                dot.edge(pid, nid, label=d.source.operation)

        return nid

    _visit(dist)
    return dot