diff --git a/isaaclab_arena/analysis/sensitivity/generate_report.py b/isaaclab_arena/analysis/sensitivity/generate_report.py index 33d409410..f4c649bc8 100644 --- a/isaaclab_arena/analysis/sensitivity/generate_report.py +++ b/isaaclab_arena/analysis/sensitivity/generate_report.py @@ -12,7 +12,16 @@ from isaaclab_arena.analysis.sensitivity.analyzer import SensitivityAnalyzer from isaaclab_arena.analysis.sensitivity.episode_results_reader import dataset_from_episode_results -from isaaclab_arena.analysis.sensitivity.plotting import plot_marginals +from isaaclab_arena.analysis.sensitivity.plotting import plot_corner, plot_importance, plot_marginals + +# Each report plot, mapped to the renderer that draws it. "marginals" writes the given output_path +# directly; the others write a sibling file suffixed with the plot name. +_PLOT_RENDERERS = { + "marginals": plot_marginals, + "importance": plot_importance, + "corner": plot_corner, +} +_DEFAULT_PLOTS = ("marginals", "importance", "corner") def generate_report( @@ -21,26 +30,33 @@ def generate_report( outcome_names: list[str] | tuple[str, ...] = ("success",), factor_names: list[str] | tuple[str, ...] | None = None, observation: list[float] | None = None, + plots: list[str] | tuple[str, ...] = _DEFAULT_PLOTS, seed: int | None = 0, -) -> Path: - """Build a sensitivity report from an episode_results.jsonl, fit, and save a figure. +) -> dict[str, Path]: + """Build a sensitivity report from an episode_results.jsonl, fit, and save the requested figures. - The factor schema is discovered from the recorder's per-episode variation draws. The output - format follows the output_path extension (.png, .pdf, …). + The factor schema is discovered from the recorder's per-episode variation draws, the posterior + is fit and sampled once, and each requested plot is rendered from those shared samples. The + output format follows the output_path extension (.png, .pdf, …). Args: episode_results_path: episode_results.jsonl produced by the per-episode recorder. - output_path: Destination figure file (parent dirs created if absent). + output_path: Destination figure file for the marginals plot; the other plots are written + as siblings suffixed with their name (e.g. report_importance.png). Parent dirs created. outcome_names: Which per-episode outcome(s) to condition on. factor_names: Which recorded variations to analyze. None analyzes all of them. observation: Outcome values to condition on, one per outcome name. None conditions on success (1) for every binary outcome. + plots: Which plots to render, any of "marginals", "importance", "corner". seed: Seed for torch's global RNG so a report is reproducible. Pass None to leave the RNG untouched. Returns: - The resolved output path. + A mapping of each rendered plot name to its written path. """ + unknown = [name for name in plots if name not in _PLOT_RENDERERS] + assert not unknown, f"Unknown plot(s) {unknown}; choose from {sorted(_PLOT_RENDERERS)}." + # Estimator training (fit) and posterior sampling both draw from torch's global RNG in # sequence, so seeding once here makes the whole report reproducible. if seed is not None: @@ -54,11 +70,17 @@ def generate_report( dataset.default_observation() if observation is None else torch.tensor(observation, dtype=torch.float32) ) samples = analyzer.sample_posterior(observation_tensor) + output_path = Path(output_path) - plot_marginals(samples, dataset, observation_tensor, output_path=str(output_path)) + written: dict[str, Path] = {} + for name in plots: + # The marginals plot keeps output_path; the others get a name-suffixed sibling. + plot_path = output_path if name == "marginals" else output_path.with_stem(f"{output_path.stem}_{name}") + _PLOT_RENDERERS[name](samples, dataset, observation_tensor, output_path=str(plot_path)) + written[name] = plot_path plt.close("all") - print(f"[INFO] Wrote report → {output_path}") - return output_path + print(f"[INFO] Wrote report → {', '.join(str(path) for path in written.values())}") + return written def main(): @@ -107,6 +129,14 @@ def main(): "Outcomes are binary, so use 1 for success or 0 for failure. Defaults to 1 (success)." ), ) + parser.add_argument( + "--plots", + type=str, + nargs="+", + choices=sorted(_PLOT_RENDERERS), + default=list(_DEFAULT_PLOTS), + help="Which plots to render. Default: marginals importance corner.", + ) parser.add_argument( "--seed", type=int, @@ -121,6 +151,7 @@ def main(): outcome_names=args.outcome, factor_names=args.factors, observation=args.observation, + plots=args.plots, seed=args.seed, ) diff --git a/isaaclab_arena/analysis/sensitivity/marginals.py b/isaaclab_arena/analysis/sensitivity/marginals.py new file mode 100644 index 000000000..98d641f8d --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/marginals.py @@ -0,0 +1,125 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Pure (numpy-only) estimators over posterior samples: marginal densities and factor importance. + +These functions take already-drawn posterior samples and return summary arrays. They run no +inference and do no plotting, so the static report and the interactive app share one source of +truth for what a marginal and an importance score mean. +""" + +from __future__ import annotations + +import numpy as np +from scipy.stats import gaussian_kde +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import torch + + from isaaclab_arena.analysis.sensitivity.dataset import FactorSpec, SensitivityDataset + +_CONSTANT_STD = 1e-9 +"""Below this sample std a continuous posterior is treated as a point mass (KDE is undefined).""" + + +def continuous_marginal_density( + factor_samples: np.ndarray, value_range: tuple[float, float], num_grid: int = 200 +) -> tuple[np.ndarray, np.ndarray | None]: + """KDE of a continuous factor's posterior samples on a grid over its swept range. + + Args: + factor_samples: 1D posterior draws for one continuous factor, in original units. + value_range: The (low, high) the factor was swept over; the grid spans it. + num_grid: Number of evenly spaced grid points across the range. + + Returns: + ``(grid, density)`` where density is the KDE evaluated on grid. density is None when the + samples are effectively constant (a point mass, where a KDE is undefined) — callers draw + that as a single line at the sample mean. + """ + range_low, range_high = value_range + grid = np.linspace(range_low, range_high, num_grid) + if float(np.std(factor_samples)) < _CONSTANT_STD: + return grid, None + density = gaussian_kde(factor_samples)(grid) + return grid, density + + +def categorical_marginal_probs(factor_samples: np.ndarray, num_choices: int) -> np.ndarray: + """Posterior probability per integer-coded choice of a categorical factor. + + sbi returns categorical columns as floats over the integer-code support, so samples are + rounded to the nearest code in [0, num_choices - 1] and tallied into frequencies. + + Args: + factor_samples: 1D posterior draws for one categorical factor (float-coded). + num_choices: Number of choices the factor can take. + + Returns: + A length-``num_choices`` array of probabilities summing to 1. + """ + codes = np.clip(np.round(factor_samples), 0, num_choices - 1).astype(int) + return np.bincount(codes, minlength=num_choices) / len(codes) + + +def factor_importance(factor_samples: np.ndarray, factor: FactorSpec) -> float: + """How far a factor's posterior marginal moved from its uniform prior, in [0, 1]. + + The score is the total-variation distance between the posterior marginal and the uniform + prior the factor was swept from, normalized so 1 means the posterior collapsed onto a single + value and 0 means it is indistinguishable from the prior (the factor did not affect the + outcome). Continuous and categorical factors share the [0, 1] scale, so a single ranking + compares them directly. + + Args: + factor_samples: 1D posterior draws for this factor, in the factor's own units/codes. + factor: The factor spec (its type, range, and choices define the prior to compare against). + """ + if factor.type == "continuous": + assert factor.range is not None, f"Continuous factor {factor.name!r} has no range." + grid, density = continuous_marginal_density(factor_samples, factor.range) + if density is None: + return 1.0 # a point mass is maximally far from the uniform prior + span = factor.range[1] - factor.range[0] + if span <= 0: + return 0.0 + # Normalize the KDE over the (truncated) range so it integrates to 1 there, then compare + # it to the uniform prior 1/span. TV = 0.5 * integral|posterior - prior|, already in [0, 1]. + area = np.trapz(density, grid) + if area <= 0: + return 0.0 + posterior = density / area + return float(0.5 * np.trapz(np.abs(posterior - 1.0 / span), grid)) + + assert factor.choices is not None, f"Categorical factor {factor.name!r} has no choices." + num_choices = len(factor.choices) + probs = categorical_marginal_probs(factor_samples, num_choices) + total_variation = 0.5 * float(np.sum(np.abs(probs - 1.0 / num_choices))) + # A categorical TV maxes out at 1 - 1/k (all mass on one choice), so rescale to [0, 1] to keep + # it comparable with the continuous score. + ceiling = 1.0 - 1.0 / num_choices + return total_variation / ceiling if ceiling > 0 else 0.0 + + +def factor_importances(samples: torch.Tensor, dataset: SensitivityDataset) -> list[tuple[str, float]]: + """Importance score for every factor, sorted most- to least-important. + + A thin dataset-level wrapper over factor_importance: it slices each factor's column out of the + joint posterior samples and scores it against its prior. + + Args: + samples: ``(num_samples, num_factors)`` posterior draws in the dataset's factor layout. + dataset: The dataset, for the factor schema and column layout. + + Returns: + ``(factor_name, score)`` pairs sorted by descending score; each score is in [0, 1]. + """ + sample_array = samples.cpu().numpy() + scored = [ + (factor.name, factor_importance(sample_array[:, dataset.factor_columns[factor.name]].squeeze(-1), factor)) + for factor in dataset.factors + ] + return sorted(scored, key=lambda name_score: name_score[1], reverse=True) diff --git a/isaaclab_arena/analysis/sensitivity/plotting.py b/isaaclab_arena/analysis/sensitivity/plotting.py index e00f4d062..e24707ce7 100644 --- a/isaaclab_arena/analysis/sensitivity/plotting.py +++ b/isaaclab_arena/analysis/sensitivity/plotting.py @@ -10,9 +10,14 @@ import numpy as np import re from pathlib import Path -from scipy.stats import gaussian_kde from typing import TYPE_CHECKING +from isaaclab_arena.analysis.sensitivity.marginals import ( + categorical_marginal_probs, + continuous_marginal_density, + factor_importances, +) + if TYPE_CHECKING: import torch @@ -21,6 +26,7 @@ _CONTINUOUS_COLOR = "steelblue" _CATEGORICAL_COLOR = "steelblue" _PRIOR_COLOR = "grey" +_IMPORTANT_COLOR = "indianred" def plot_marginals( @@ -94,20 +100,26 @@ def plot_marginals( return figure -def _draw_continuous_marginal(ax, factor: FactorSpec, factor_samples: np.ndarray) -> None: +def _draw_continuous_marginal(ax, factor: FactorSpec, factor_samples: np.ndarray, compact: bool = False) -> None: """Posterior density of a continuous factor over its swept range. Draws the KDE of the posterior samples, the uniform prior as a flat reference, and shades the central 5-95% of the posterior. Reading the posterior against the prior shows whether conditioning on the outcome concentrated the factor, which a mean alone would miss for a factor swept symmetrically around its nominal value. + + Args: + ax: Axis to draw on. + factor: The continuous factor (its range bounds the x-axis and the prior). + factor_samples: 1D posterior draws for this factor. + compact: Drop the legend and axis labels, for use as a corner-plot diagonal where edge + axes carry the labels. """ range_low, range_high = factor.range span = range_high - range_low - if float(np.std(factor_samples)) >= 1e-9: - grid = np.linspace(range_low, range_high, 200) - density = gaussian_kde(factor_samples)(grid) + grid, density = continuous_marginal_density(factor_samples, factor.range) + if density is not None: ax.plot(grid, density, color=_CONTINUOUS_COLOR, linewidth=2, label="posterior") ax.fill_between(grid, 0, density, color=_CONTINUOUS_COLOR, alpha=0.2) ax.set_ylim(bottom=0) @@ -122,27 +134,234 @@ def _draw_continuous_marginal(ax, factor: FactorSpec, factor_samples: np.ndarray ax.axhline(1.0 / span, color=_PRIOR_COLOR, linestyle="--", linewidth=1.5, label="prior (uniform)") ax.set_xlim(range_low, range_high) + ax.grid(alpha=0.3) + if compact: + return ax.set_xlabel(factor.name) ax.set_ylabel("posterior density") ax.legend(loc="best", fontsize=9) - ax.grid(alpha=0.3) -def _draw_categorical_marginal(ax, factor: FactorSpec, factor_samples: np.ndarray) -> None: +def _draw_categorical_marginal(ax, factor: FactorSpec, factor_samples: np.ndarray, compact: bool = False) -> None: """Bar chart of a categorical factor's posterior probability per choice. sbi returns categorical columns as floats over the integer-code support, so samples are rounded to the nearest code in [0, num_choices - 1] and tallied into frequencies. + + Args: + ax: Axis to draw on. + factor: The categorical factor (its choices label the bars). + factor_samples: 1D posterior draws for this factor (float-coded). + compact: Drop the axis labels, for use as a corner-plot diagonal. """ assert factor.choices is not None num_choices = len(factor.choices) - codes = np.clip(np.round(factor_samples), 0, num_choices - 1).astype(int) - probabilities = np.bincount(codes, minlength=num_choices) / len(codes) + probabilities = categorical_marginal_probs(factor_samples, num_choices) ax.bar(range(num_choices), probabilities, color=_CATEGORICAL_COLOR, alpha=0.8) ax.set_xticks(range(num_choices)) ax.set_xticklabels(factor.choices, rotation=30, ha="right") - ax.set_xlabel(factor.name) - ax.set_ylabel("posterior probability") ax.set_ylim(0, 1.05) ax.grid(alpha=0.3, axis="y") + if compact: + return + ax.set_xlabel(factor.name) + ax.set_ylabel("posterior probability") + + +def plot_corner( + samples: torch.Tensor, + dataset: SensitivityDataset, + observation: torch.Tensor, + output_path: str | None = None, +): + """Corner plot: per-factor marginals on the diagonal, pairwise joints below it. + + A pure renderer over already-sampled posterior draws. The diagonal repeats the 1D marginals; + each lower-triangle cell is the joint posterior of its column-factor (x) and row-factor (y), + which is where factor interactions show up (e.g. the outcome needing low light *and* a small + camera offset together) — something the 1D marginals cannot reveal. + + Args: + samples: ``(num_samples, num_factors)`` posterior draws in the dataset's factor layout. + dataset: The dataset, for the factor schema and column layout. + observation: The outcome vector the samples were conditioned on (shown in the title). + output_path: If given, save the figure here. The format follows the path's extension + (.png, .pdf, …); parent directories are created. + + Returns: + The matplotlib Figure. + """ + sample_array = samples.cpu().numpy() + factors = dataset.factors + num_factors = len(factors) + columns = dataset.factor_columns + + def factor_samples(factor: FactorSpec) -> np.ndarray: + return sample_array[:, columns[factor.name]].squeeze(-1) + + figure, axes = plt.subplots(num_factors, num_factors, figsize=(3.2 * num_factors, 3.2 * num_factors), squeeze=False) + for row in range(num_factors): + for col in range(num_factors): + ax = axes[row][col] + if col > row: + ax.axis("off") # upper triangle is the mirror of the lower, so leave it blank + continue + factor_x, factor_y = factors[col], factors[row] + if row == col: + if factor_x.type == "continuous": + _draw_continuous_marginal(ax, factor_x, factor_samples(factor_x), compact=True) + else: + _draw_categorical_marginal(ax, factor_x, factor_samples(factor_x), compact=True) + else: + _draw_joint(ax, factor_x, factor_y, factor_samples(factor_x), factor_samples(factor_y)) + # Label only the edge axes (bottom row gets x names, left column gets y names). + if row == num_factors - 1: + ax.set_xlabel(factor_x.name, fontsize=9) + if col == 0: + ax.set_ylabel(factor_y.name, fontsize=9) + + observation_label = ", ".join( + f"{name}={value:g}" for name, value in zip(dataset.outcome_names, observation.tolist()) + ) + figure.suptitle( + f"Posterior corner plot — {dataset.num_episodes} episodes (observed: {observation_label})", + fontsize=13, + fontweight="bold", + ) + figure.tight_layout(rect=[0, 0, 1, 0.97]) + + if output_path is not None: + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + figure.savefig(output_path, dpi=150, bbox_inches="tight") + return figure + + +def _draw_joint(ax, factor_x: FactorSpec, factor_y: FactorSpec, samples_x: np.ndarray, samples_y: np.ndarray) -> None: + """Joint posterior of two factors, dispatched by their type combination. + + continuous × continuous draws a 2D-histogram heatmap; continuous × categorical draws a violin + of the continuous factor per category; categorical × categorical draws a joint-probability + heatmap. factor_x is the horizontal axis and factor_y the vertical one. + """ + x_continuous = factor_x.type == "continuous" + y_continuous = factor_y.type == "continuous" + if x_continuous and y_continuous: + ax.hist2d(samples_x, samples_y, bins=40, range=[factor_x.range, factor_y.range], cmap="Blues") + ax.set_xlim(*factor_x.range) + ax.set_ylim(*factor_y.range) + elif x_continuous and not y_continuous: + _draw_violin_per_category(ax, factor_y, samples_y, factor_x, samples_x, continuous_axis="x") + elif not x_continuous and y_continuous: + _draw_violin_per_category(ax, factor_x, samples_x, factor_y, samples_y, continuous_axis="y") + else: + _draw_categorical_heatmap(ax, factor_x, samples_x, factor_y, samples_y) + ax.grid(alpha=0.3) + + +def _draw_violin_per_category( + ax, + categorical_factor: FactorSpec, + categorical_samples: np.ndarray, + continuous_factor: FactorSpec, + continuous_samples: np.ndarray, + continuous_axis: str, +) -> None: + """Violins of a continuous factor split by a categorical factor's choices. + + Each violin is the continuous factor's distribution among the posterior draws taking one + categorical choice. continuous_axis ("x" or "y") says which axis the continuous factor runs + along, so the same routine serves both lower-triangle orientations. + """ + assert categorical_factor.choices is not None + num_choices = len(categorical_factor.choices) + codes = np.clip(np.round(categorical_samples), 0, num_choices - 1).astype(int) + grouped = [continuous_samples[codes == choice] for choice in range(num_choices)] + # violinplot rejects empty groups, so drop choices no posterior draw landed on. + positions = [choice for choice, values in enumerate(grouped) if len(values) > 0] + populated = [values for values in grouped if len(values) > 0] + if not populated: + return + vertical = continuous_axis == "y" + ax.violinplot(populated, positions=positions, vert=vertical, showmeans=True, widths=0.8) + if vertical: + ax.set_xticks(range(num_choices)) + ax.set_xticklabels(categorical_factor.choices, rotation=30, ha="right", fontsize=8) + ax.set_ylim(*continuous_factor.range) + else: + ax.set_yticks(range(num_choices)) + ax.set_yticklabels(categorical_factor.choices, fontsize=8) + ax.set_xlim(*continuous_factor.range) + + +def _draw_categorical_heatmap( + ax, factor_x: FactorSpec, samples_x: np.ndarray, factor_y: FactorSpec, samples_y: np.ndarray +) -> None: + """Heatmap of the joint posterior probability over two categorical factors' choices.""" + assert factor_x.choices is not None and factor_y.choices is not None + num_x, num_y = len(factor_x.choices), len(factor_y.choices) + codes_x = np.clip(np.round(samples_x), 0, num_x - 1).astype(int) + codes_y = np.clip(np.round(samples_y), 0, num_y - 1).astype(int) + # joint[row=y, col=x] = P(x=col, y=row); origin="lower" puts code 0 at the bottom-left. + joint = np.zeros((num_y, num_x)) + np.add.at(joint, (codes_y, codes_x), 1.0) + joint /= joint.sum() + ax.imshow(joint, origin="lower", aspect="auto", cmap="Blues", extent=(-0.5, num_x - 0.5, -0.5, num_y - 0.5)) + ax.set_xticks(range(num_x)) + ax.set_xticklabels(factor_x.choices, rotation=30, ha="right", fontsize=8) + ax.set_yticks(range(num_y)) + ax.set_yticklabels(factor_y.choices, fontsize=8) + + +def plot_importance( + samples: torch.Tensor, + dataset: SensitivityDataset, + observation: torch.Tensor, + output_path: str | None = None, +): + """Tornado chart ranking factors by how far their posterior moved from the prior. + + A pure renderer over already-sampled posterior draws. Each bar is a factor's importance score + in [0, 1] (see marginals.factor_importance): near 0 the factor barely affected the outcome, + near 1 the outcome pins it down. Bars are sorted most- to least-important, the most important + on top, giving the report a single "what matters" summary across continuous and categorical + factors alike. + + Args: + samples: ``(num_samples, num_factors)`` posterior draws in the dataset's factor layout. + dataset: The dataset, for the factor schema and column layout. + observation: The outcome vector the samples were conditioned on (shown in the title). + output_path: If given, save the figure here. The format follows the path's extension + (.png, .pdf, …); parent directories are created. + + Returns: + The matplotlib Figure. + """ + scored = factor_importances(samples, dataset) + names = [name for name, _ in scored] + scores = [score for _, score in scored] + + # barh fills bottom-up, so reverse to put the most important factor on top. + positions = range(len(names)) + figure, ax = plt.subplots(figsize=(8.0, max(2.0, 0.5 * len(names) + 1.0))) + ax.barh(positions, scores[::-1], color=_IMPORTANT_COLOR, alpha=0.85) + ax.set_yticks(positions) + ax.set_yticklabels(names[::-1]) + ax.set_xlim(0, 1) + ax.set_xlabel("importance (total-variation distance from prior, 0–1)") + ax.grid(alpha=0.3, axis="x") + + observation_label = ", ".join( + f"{name}={value:g}" for name, value in zip(dataset.outcome_names, observation.tolist()) + ) + ax.set_title( + f"Factor importance — {dataset.num_episodes} episodes (observed: {observation_label})", + fontsize=12, + fontweight="bold", + ) + figure.tight_layout() + + if output_path is not None: + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + figure.savefig(output_path, dpi=150, bbox_inches="tight") + return figure diff --git a/isaaclab_arena/tests/sensitivity_synthetic.py b/isaaclab_arena/tests/sensitivity_synthetic.py index b2b4392be..cd8d21a4d 100644 --- a/isaaclab_arena/tests/sensitivity_synthetic.py +++ b/isaaclab_arena/tests/sensitivity_synthetic.py @@ -26,10 +26,11 @@ import argparse import torch from dataclasses import dataclass +from pathlib import Path from isaaclab_arena.analysis.sensitivity.analyzer import SensitivityAnalyzer from isaaclab_arena.analysis.sensitivity.dataset import FactorSpec, SensitivityDataset -from isaaclab_arena.analysis.sensitivity.plotting import plot_marginals +from isaaclab_arena.analysis.sensitivity.plotting import plot_corner, plot_importance, plot_marginals @dataclass(frozen=True) @@ -188,8 +189,13 @@ def _demo(): analyzer.fit() observation = dataset.default_observation() samples = analyzer.sample_posterior(observation) - plot_marginals(samples, dataset, observation, output_path=args.output) - print(f"[INFO] Wrote synthetic sensitivity report → {args.output}") + + # Render all three report plots from the one posterior sample, each to a name-suffixed file. + output = Path(args.output) + for name, render in (("marginals", plot_marginals), ("importance", plot_importance), ("corner", plot_corner)): + plot_path = output if name == "marginals" else output.with_stem(f"{output.stem}_{name}") + render(samples, dataset, observation, output_path=str(plot_path)) + print(f"[INFO] Wrote synthetic {name} plot → {plot_path}") if __name__ == "__main__":