diff --git a/docs/pages/concepts/policy/concept_sensitivity_analysis.rst b/docs/pages/concepts/policy/concept_sensitivity_analysis.rst new file mode 100644 index 0000000000..82793670f8 --- /dev/null +++ b/docs/pages/concepts/policy/concept_sensitivity_analysis.rst @@ -0,0 +1,173 @@ +Sensitivity Analysis +==================== + +The sensitivity-analysis toolbox answers a single question about a policy: +*which environment conditions drive success?* Given the per-episode results of an +evaluation sweep — where factors such as lighting, object mass, or table material were +varied — it fits a posterior over those factors conditioned on the outcome (e.g. success +rate) and renders one figure summarising which factor values are associated with success. + +Two distinct ideas are at work. *Joint* means all factors are modelled together rather than +one at a time, which is what captures interactions and confounds (see the next section). +*Posterior* means the result is conditioned on the outcome: starting from the prior — the +factor values the sweep actually drew, uniform over the declared ranges — it reweights them +by how often each led to the chosen outcome. So the figure answers *given success, which +factor values were in play?*, not merely *how were the factors distributed in the sweep?* + +Why a joint posterior, not a success rate per factor? +----------------------------------------------------- + +The simplest analysis would chart a success rate for each factor independently. That hides +the two things that matter most in a multi-factor sweep: + +- **Factors interact.** How much light a policy needs can depend on the object — a matte + object may succeed at low light while a shiny one needs far more. A per-factor + "success vs light" curve averages over objects and reports one blurry gate that is wrong + for both. The joint posterior keeps the interaction, so you can condition on a specific + object and see its gate. +- **Factors confound each other.** If bright-light episodes also happened to use an easy + object, a per-factor light chart cannot tell which one drove success. Modelling all + factors together attributes the effect to the factor that actually carries it. + +The per-factor rate is a projection of the joint posterior — derivable from it, but not the +other way around. The toolbox therefore always fits the joint — via simulation-based +inference (MNPE or NPE) — and reads the per-factor marginals from it. + +How it works +------------ + +The toolbox is a thin analysis layer over `sbi `_'s +neural posterior estimators. The flow is: + +1. **Per-episode input.** The analysis reads an ``episode_summary.jsonl`` — one row per + episode, holding that episode's factor values and outcomes. +2. **Schema.** A ``factors.yaml`` declares the *factors* — which ``arena_env_args`` columns + were varied and whether each is continuous or categorical, plus the continuous ranges + that were swept (so the analyzer's prior matches the simulation). It does **not** list + outcomes — *which* outcome to condition on is chosen at analysis time, not saved here. +3. **Inference.** ``SensitivityAnalyzer`` loads the pair, trains an estimator on the full + ``(theta, x)`` jointly — sbi's terms for the factor values (``theta``) and the per-episode + outcomes (``x``) — and samples the joint posterior conditioned on a chosen observation + (by default, success). +4. **Report.** A probability density curve for each continuous factor and a probability bar + chart for each categorical factor. + +.. todo:: + + The eval-runner writer (``episode_writer``) that emits ``episode_summary.jsonl`` during + evaluation is not part of this version — it lands in a follow-up. For now, run the analysis + on synthetic data (see below) or on a JSONL produced externally. + +Inputs +------ + +**factors.yaml** declares only the factors that were varied (and the continuous ranges that +were swept). Outcomes are not declared here — they're selected at analysis time (see below): + +.. code-block:: yaml + + factors: + light_intensity: + type: continuous + range: [[0.0, 5000.0]] # the swept range; inferred from the data's min/max if omitted + table_material: + type: categorical + choices: [oak, walnut, bamboo] + +**episode_summary.jsonl** holds one JSON object per episode. It carries every measured +outcome; the analysis picks which one(s) to condition on: + +.. code-block:: json + + {"job_name": "pi0_sweep", "episode_idx": 0, + "arena_env_args": {"light_intensity": 3200.0, "table_material": "oak"}, + "outcomes": {"success": 1}} + +Choice of estimator +------------------- + +``SensitivityAnalyzer`` picks the estimator from the schema automatically: + +.. list-table:: + :header-rows: 1 + :widths: 25 25 50 + + * - Schema + - Estimator + - Notes + * - Any categorical factor + - MNPE + - Mixed density estimator; handles continuous + categorical factors together. + * - All continuous factors + - NPE + - Restricts to a Gaussian on a single factor, so a meaningful continuous-only + analysis needs at least two continuous factors. + +Continuous factors are normalised to ``[0, 1]`` before fitting and de-normalised when +sampling, so factors on very different scales (e.g. light in the thousands, an offset in +the hundredths) train on equal footing. Outcomes are binary (0/1); the default query +conditions on success (1). + +Running a report +---------------- + +Point the report generator at a ``(factors.yaml, episode_summary.jsonl)`` pair. The output +format follows the file extension (``.png``, ``.pdf``, …); reports are written under +``eval/`` by default. + +.. code-block:: bash + + python -m isaaclab_arena.analysis.sensitivity.generate_report \ + --factors_yaml factors.yaml \ + --episode_summary episode_summary.jsonl \ + --outcome success \ + --output eval/sensitivity_report.png + +``--outcome`` selects which per-episode outcome(s) to condition on (keys in the rows' +``outcomes`` block); it defaults to ``success``. Pass ``--observation`` to set the value +per outcome — since outcomes are binary, use ``1`` for success or ``0`` for failure; it +defaults to ``1`` (success). + +Trying it on synthetic data +--------------------------- + +A synthetic simulator with a *known* ground truth lets you run the whole pipeline without +Isaac Sim — useful for seeing the output shape and for validating the toolbox +(the recovered posterior should reflect the planted relationship): + +.. code-block:: bash + + # mixed: three continuous + two categorical factors (MNPE) + python -m isaaclab_arena.tests.sensitivity_synthetic --kind mixed --output eval/demo.png + +``--kind`` also accepts ``continuous`` (continuous-only factors, which exercises the NPE path). + +Reading the output +------------------ + +.. todo:: + + Add a sample report figure here and walk through reading it. + +Each panel is the posterior over one factor *conditioned on success*. Intuitively it answers +"given the policy succeeded, which values of this factor were responsible?" More precisely, +among the successful episodes it shows the probability density that the factor took each +value. For a continuous factor, mass concentrated at one end of its range means success +favoured that end — e.g. a curve rising toward bright light means successful episodes were +almost all bright ones, i.e. the policy needs bright light to succeed. +For a categorical factor, the tallest bar is the value most associated with success. + +Current scope +------------- + +- Outcomes are treated as **binary** (0/1). Conditioning defaults to success; a continuous + outcome is rejected with a clear error rather than silently averaged. +- Continuous **vector** factors (``dim > 1``) are reserved for a future extension. The likely + approach is to record scalar reductions (e.g. a norm or distance-to-reference) alongside the + raw vector, so a pose or RGB factor becomes one or more analysable scalar columns. +- The estimators run on CPU and do not require Isaac Sim, so a report can be generated + anywhere the evaluation JSONL is available. +- The analysis assumes the ``episode_summary.jsonl`` is a single coherent slice — one + policy, task, and embodiment. **TODO:** add a filter (in the spirit of robolab's + ``--filter-policy`` / ``--filter-task``) to select that slice from a larger JSONL, + rather than relying on the caller to pre-filter it. diff --git a/docs/pages/concepts/policy/index.rst b/docs/pages/concepts/policy/index.rst index 8fb97a554e..870e226b9c 100644 --- a/docs/pages/concepts/policy/index.rst +++ b/docs/pages/concepts/policy/index.rst @@ -91,3 +91,4 @@ More details :maxdepth: 1 concept_evaluation_types + concept_sensitivity_analysis diff --git a/isaaclab_arena/analysis/__init__.py b/isaaclab_arena/analysis/__init__.py new file mode 100644 index 0000000000..16ea4c2183 --- /dev/null +++ b/isaaclab_arena/analysis/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/isaaclab_arena/analysis/sensitivity/__init__.py b/isaaclab_arena/analysis/sensitivity/__init__.py new file mode 100644 index 0000000000..16ea4c2183 --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/isaaclab_arena/analysis/sensitivity/analyzer.py b/isaaclab_arena/analysis/sensitivity/analyzer.py new file mode 100644 index 0000000000..cca176797a --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/analyzer.py @@ -0,0 +1,109 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import torch + +from sbi.inference import MNPE, NPE +from sbi.utils import BoxUniform + +from isaaclab_arena.analysis.sensitivity.dataset import SensitivityDataset + + +class SensitivityAnalyzer: + """Fits a neural posterior over all factors, conditioned on all outcomes. + + Picks the sbi estimator from the schema: + + - MNPE when any factor is categorical (it handles mixed continuous + categorical theta). + - NPE when every factor is continuous. + + Following sbi's convention, ``theta`` is the per-episode factor values (the inputs the + posterior is inferred over) and ``x`` is the per-episode outcomes (the observations a query + conditions on). It trains on the full (theta, x) and samples the joint posterior at a chosen + observation. The single observation conditions on *all* outcome columns at once, so a + query like "which factors produced success?" is answered for every factor jointly. + + Continuous factors are normalized to [0, 1] before fitting and denormalized when + sampling, so factors on very different scales (e.g. light in thousands, an offset in + hundredths) train on equal footing. Categorical columns keep their integer codes. + """ + + def __init__(self, dataset: SensitivityDataset): + self.dataset = dataset + self.posterior = None + continuous_factors = [factor for factor in dataset.schema.factors if factor.type == "continuous"] + # theta is laid out continuous-first then categorical — built that way by + # SensitivityDataset and defined by FactorSchema.factor_columns — so the leading + # self._num_continuous columns are the continuous factors that _normalize/_denormalize slice. + self._num_continuous = len(continuous_factors) + for factor in continuous_factors: + assert factor.range is not None, ( + f"Continuous factor {factor.name!r} has no range to normalize against. Declare a" + " range in factors.yaml, or build the dataset via from_files()/from_file() so the" + " range is inferred from the data before constructing the analyzer." + ) + self._continuous_low = torch.tensor([factor.range[0][0] for factor in continuous_factors]) + self._continuous_high = torch.tensor([factor.range[0][1] for factor in continuous_factors]) + + def _select_inference_class(self): + """Choose the sbi inference class for this schema. + + Returns MNPE when any factor is categorical (its mixed density estimator handles + continuous + categorical theta together), and NPE when every factor is continuous. + """ + return MNPE if self.dataset.has_categorical_factors else NPE + + def _normalized_prior(self): + """Uniform prior matching the normalized theta: continuous dims [0, 1], categoricals [0, k-1].""" + low_bounds = [0.0] * self._num_continuous + high_bounds = [1.0] * self._num_continuous + for factor in self.dataset.schema.factors: + if factor.type == "categorical": + low_bounds.append(0.0) + high_bounds.append(float(len(factor.choices) - 1)) + return BoxUniform(low=torch.tensor(low_bounds), high=torch.tensor(high_bounds)) + + def _normalize(self, theta: torch.Tensor) -> torch.Tensor: + """Scale the continuous (leading) theta columns to [0, 1]; leave categoricals untouched.""" + normalized = theta.clone() + span = (self._continuous_high - self._continuous_low).clamp_min(1e-12) + normalized[:, : self._num_continuous] = (theta[:, : self._num_continuous] - self._continuous_low) / span + return normalized + + def _denormalize(self, theta: torch.Tensor) -> torch.Tensor: + """Inverse of _normalize: map the continuous columns back to their original ranges.""" + denormalized = theta.clone() + span = self._continuous_high - self._continuous_low + denormalized[:, : self._num_continuous] = theta[:, : self._num_continuous] * span + self._continuous_low + return denormalized + + def fit(self, training_batch_size: int = 50): + """Train the estimator on the full (theta, x); store and return the fitted posterior.""" + print( + f"[INFO] SensitivityAnalyzer: fitting {self._select_inference_class().__name__} on" + f" {self.dataset.num_episodes} episodes" + f" (theta dim={self.dataset.theta.shape[1]}, x dim={self.dataset.x.shape[1]})." + ) + inference = self._select_inference_class()(prior=self._normalized_prior()) + inference.append_simulations(self._normalize(self.dataset.theta), self.dataset.x) + density_estimator = inference.train(training_batch_size=training_batch_size) + self.posterior = inference.build_posterior(density_estimator) + return self.posterior + + def sample_posterior(self, observation: torch.Tensor | None = None, num_samples: int = 5000) -> torch.Tensor: + """Sample the joint posterior over all factors at observation. + + Defaults to the dataset's default observation (condition on success). Returns a + (num_samples, total_factor_dim) tensor laid out like theta — continuous columns first + (in original, denormalized units), then integer-coded categorical columns. + """ + assert self.posterior is not None, "Call fit() before sampling the posterior" + if observation is None: + observation = self.dataset.default_observation() + with torch.no_grad(): + normalized_samples = self.posterior.sample((num_samples,), x=observation) + return self._denormalize(normalized_samples) diff --git a/isaaclab_arena/analysis/sensitivity/dataset.py b/isaaclab_arena/analysis/sensitivity/dataset.py new file mode 100644 index 0000000000..c4bac0a610 --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/dataset.py @@ -0,0 +1,327 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import json +import torch +import yaml +from dataclasses import dataclass +from enum import Enum +from pathlib import Path + + +class FactorType(str, Enum): + """Whether a factor's values are continuous (numeric range) or categorical (labelled choices).""" + + CONTINUOUS = "continuous" + CATEGORICAL = "categorical" + + +@dataclass +class FactorSpec: + """One factor's schema as declared in factors.yaml. + + Continuous factors carry a range (one [low, high] pair per dim); categorical + factors carry choices (a list of string labels, integer-encoded by index in theta). + """ + + name: str + type: FactorType + dim: int = 1 + range: list[tuple[float, float]] | None = None # one (low, high) pair per dim, continuous only + choices: list[str] | None = None # categorical only + + def __post_init__(self) -> None: + # Accept the raw string form (from YAML / callers) and normalize to the enum. + self.type = FactorType(self.type) + # Normalize each (low, high) pair to a tuple (YAML/JSON deliver them as lists). + if self.range is not None: + self.range = [tuple(pair) for pair in self.range] + + +@dataclass +class FactorSchema: + """Parsed factors.yaml — the list of factors that were varied. + + The schema describes what *can* vary (continuous vs categorical, range/choices), not the + values taken in any given episode. Outcomes are not part of the schema; which outcome to + condition on is chosen at analysis time. + """ + + factors: list[FactorSpec] + + @classmethod + def from_yaml(cls, path: str | Path) -> FactorSchema: + """Load a factors.yaml from disk into a typed FactorSchema. + + The YAML has one top-level block, factors (one entry per varied input). Each factor's + type must be continuous or categorical. + """ + # TODO: add a robolab-style filter (e.g. select rows by policy/task/embodiment) so a + # single episode_summary.jsonl can be sliced to one coherent (policy, task, embodiment) + # before analysis, instead of assuming the caller pre-filtered it. + with open(path, encoding="utf-8") as yaml_file: + yaml_data = yaml.safe_load(yaml_file) + assert isinstance(yaml_data, dict), f"factors.yaml at {path} must be a mapping at top level" + assert "factors" in yaml_data, f"factors.yaml at {path} is missing top-level `factors:` block" + + factors: list[FactorSpec] = [] + for factor_name, factor_block in yaml_data["factors"].items(): + assert "type" in factor_block, ( + f"factors.yaml at {path} factor {factor_name!r} is missing required `type:` field" + " (expected 'continuous' or 'categorical')" + ) + factor_type = factor_block["type"] + assert factor_type in ("continuous", "categorical"), ( + f"factors.yaml at {path} factor {factor_name!r} has unknown type {factor_type!r};" + " expected 'continuous' or 'categorical'" + ) + factors.append( + FactorSpec( + name=factor_name, + type=factor_type, + dim=factor_block.get("dim", 1), + range=factor_block.get("range"), + choices=factor_block.get("choices"), + ) + ) + + return cls(factors=factors) + + @property + def total_factor_dim(self) -> int: + """Total width of theta — sum of dim over continuous factors plus 1 per categorical.""" + return sum(factor.dim if factor.type == "continuous" else 1 for factor in self.factors) + + @property + def factor_columns(self) -> dict[str, slice]: + """Map factor name → its column slice in theta. + + Continuous factors occupy the leading columns (dim each), then each categorical + factor occupies one trailing column. This continuous-first layout is what sbi's + mixed density estimator expects. + """ + continuous_factors = [factor for factor in self.factors if factor.type == "continuous"] + categorical_factors = [factor for factor in self.factors if factor.type == "categorical"] + column_slices: dict[str, slice] = {} + column_index = 0 + for factor in continuous_factors + categorical_factors: + column_width = factor.dim if factor.type == "continuous" else 1 + column_slices[factor.name] = slice(column_index, column_index + column_width) + column_index += column_width + return column_slices + + +class SensitivityDataset: + """A FactorSchema paired with its per-episode theta (factors) and x (outcomes). + + The object is a pure container: it holds the schema and the two tensors, and exposes + the prior and column layouts an analyzer consumes. It can be built two ways: + + - from_files — parse a factors.yaml / episode_summary.jsonl pair + (the path eval runs take). + - the constructor — wrap in-memory tensors directly (what a synthetic simulator or + a unit test takes). The tensors must already be in the layout factor_columns + describes: continuous columns first, then one integer-coded column per categorical. + """ + + def __init__( + self, + schema: FactorSchema, + theta: torch.Tensor, + x: torch.Tensor, + outcome_names: list[str] | tuple[str, ...] = ("success",), + ): + """Wrap an in-memory schema plus its theta / x tensors, validating shapes. + + Args: + schema: The parsed factor schema. Continuous factors must carry a range; + categorical factors must carry choices. + theta: (num_episodes, total_factor_dim) factor matrix, continuous-first. + x: (num_episodes, num_outcomes) outcome matrix. + outcome_names: Name of each outcome column in x, in order (used for plot labels). + """ + assert theta.ndim == 2 and x.ndim == 2, f"theta and x must be 2D; got {theta.shape} and {x.shape}" + assert theta.shape[0] == x.shape[0], f"theta/x row counts disagree: {theta.shape[0]} vs {x.shape[0]}" + assert theta.shape[0] > 0, "Dataset is empty (no episodes)" + assert ( + theta.shape[1] == schema.total_factor_dim + ), f"theta has {theta.shape[1]} columns but schema declares {schema.total_factor_dim} factor dims" + assert x.shape[1] == len( + outcome_names + ), f"x has {x.shape[1]} columns but {len(outcome_names)} outcome name(s) were given" + self.schema = schema + self.outcome_names = list(outcome_names) + self._theta = theta + self._x = x + + @classmethod + def from_files( + cls, + factors_yaml: str | Path, + jsonl_path: str | Path, + outcome_names: list[str] | tuple[str, ...] = ("success",), + ) -> SensitivityDataset: + """Build a dataset from a factors.yaml schema and an episode_summary.jsonl. + + Parses and validates both, infers any missing continuous range from the data, and + assembles the theta / x tensors in the layout the analyzers expect. ``outcome_names`` + selects which per-episode outcome columns to condition on (the analysis-time query). + """ + schema = FactorSchema.from_yaml(factors_yaml) + + jsonl_text = Path(jsonl_path).read_text(encoding="utf-8") + rows = [json.loads(line) for line in jsonl_text.splitlines() if line.strip()] + assert len(rows) > 0, f"Empty episode_summary.jsonl at {jsonl_path}" + + _validate_rows(schema, rows, outcome_names, jsonl_path) + _infer_missing_factor_ranges(schema, rows) + + theta = _build_factor_tensor(schema, rows) + x = _build_outcome_tensor(rows, outcome_names) + return cls(schema, theta, x, outcome_names) + + @property + def theta(self) -> torch.Tensor: + """(num_episodes, total_factor_dim) matrix of factor values, one row per episode. + + This is the "input" sbi infers a posterior over. Column layout is given by + factor_columns — continuous factors first, then categoricals (integer-coded). + """ + return self._theta + + @property + def x(self) -> torch.Tensor: + """(num_episodes, num_outcomes) matrix of outcome values, one row per episode. + + This is what the analyzer conditions queries on — "what factor values were consistent + with observing these outcomes?". Columns are named by ``outcome_names``. + """ + return self._x + + @property + def num_episodes(self) -> int: + """Number of episodes (rows) in the dataset.""" + return self._theta.shape[0] + + @property + def factor_columns(self) -> dict[str, slice]: + """Map factor name → its column slice in theta. Same as schema.factor_columns.""" + return self.schema.factor_columns + + def default_observation(self) -> torch.Tensor: + """The default outcome vector to condition a query on: success (1) for every outcome. + + Outcomes are binary (0/1) in the current scope, so the natural default query is + "what produced success?". Asserts the outcomes are binary, so adding a continuous + outcome later fails loudly here instead of silently conditioning on a meaningless value. + """ + is_binary = set(self._x.flatten().tolist()).issubset({0.0, 1.0}) + assert is_binary, "default_observation assumes binary (0/1) outcomes; pass an explicit observation otherwise." + return torch.ones(self._x.shape[1], dtype=torch.float32) + + @property + def has_categorical_factors(self) -> bool: + """True iff the schema declares at least one categorical factor.""" + return any(factor.type == "categorical" for factor in self.schema.factors) + + +def _validate_rows( + schema: FactorSchema, rows: list[dict], outcome_names: list[str] | tuple[str, ...], jsonl_path: str | Path +) -> None: + """Assert every JSONL row carries the declared factor keys and the requested outcome keys. + + The declared names need only be a subset of each row's arena_env_args / outcomes; + extra keys are ignored. Raises pointing at the first offending row. + """ + expected_factor_names = {factor.name for factor in schema.factors} + expected_outcome_names = set(outcome_names) + for row_index, row in enumerate(rows): + assert ( + "arena_env_args" in row and "outcomes" in row + ), f"Row {row_index} of {jsonl_path} missing arena_env_args/outcomes block" + missing_factor_names = expected_factor_names - set(row["arena_env_args"].keys()) + assert not missing_factor_names, ( + f"Row {row_index} of {jsonl_path} is missing factor(s) " + f"{sorted(missing_factor_names)} from its arena_env_args block; " + f"factors.yaml declares: {sorted(expected_factor_names)}" + ) + missing_outcome_names = expected_outcome_names - set(row["outcomes"].keys()) + assert ( + not missing_outcome_names + ), f"Row {row_index} of {jsonl_path} missing outcomes {sorted(missing_outcome_names)}" + + +def _infer_missing_factor_ranges(schema: FactorSchema, rows: list[dict]) -> None: + """Fill any continuous factor's missing range from the observed min/max. + + A range declared in factors.yaml takes precedence and is left untouched. + """ + for factor in schema.factors: + if factor.type != "continuous" or factor.range is not None: + continue + if factor.dim != 1: + raise NotImplementedError( + "Range inference for vector factors (dim > 1) is not implemented;" + f" factor {factor.name!r} has dim={factor.dim}" + ) + observed_values = [float(row["arena_env_args"][factor.name]) for row in rows] + factor.range = [(min(observed_values), max(observed_values))] + + +def _build_factor_tensor(schema: FactorSchema, rows: list[dict]) -> torch.Tensor: + """Assemble the per-episode factor matrix theta. + + Continuous columns first (one per dim), then one column per categorical factor with its + value integer-coded as a float32 index into FactorSpec.choices. + """ + continuous_factors = [factor for factor in schema.factors if factor.type == "continuous"] + categorical_factors = [factor for factor in schema.factors if factor.type == "categorical"] + + factor_columns: list[torch.Tensor] = [] + + # Continuous columns come first (sbi MNPE convention). + for factor in continuous_factors: + if factor.dim != 1: + raise NotImplementedError( + "Vector continuous factors (dim > 1) are not yet supported;" + f" factor {factor.name!r} has dim={factor.dim}" + ) + raw_values = [float(row["arena_env_args"][factor.name]) for row in rows] + factor_column = torch.tensor(raw_values, dtype=torch.float32).unsqueeze(1) + factor_columns.append(factor_column) + + # Categorical columns: integer-code each string value as its index in FactorSpec.choices. + for factor in categorical_factors: + assert ( + factor.choices is not None and len(factor.choices) > 0 + ), f"Categorical factor {factor.name!r} has no `choices:` block in factors.yaml" + choice_to_code = {choice: code for code, choice in enumerate(factor.choices)} + category_codes: list[int] = [] + for row_index, row in enumerate(rows): + value = row["arena_env_args"][factor.name] + assert ( + value in choice_to_code + ), f"Row {row_index} factor {factor.name!r} has value {value!r} not in declared choices {factor.choices}" + category_codes.append(choice_to_code[value]) + factor_column = torch.tensor(category_codes, dtype=torch.float32).unsqueeze(1) + factor_columns.append(factor_column) + + if factor_columns: + return torch.cat(factor_columns, dim=1) + return torch.zeros((len(rows), 0), dtype=torch.float32) + + +def _build_outcome_tensor(rows: list[dict], outcome_names: list[str] | tuple[str, ...]) -> torch.Tensor: + """Assemble the per-episode outcome matrix x (one column per requested outcome). + + Each outcome value is cast to float; bool outcomes become 0.0/1.0. + """ + outcome_column_tensors = [ + torch.tensor([float(row["outcomes"][name]) for row in rows], dtype=torch.float32).unsqueeze(1) + for name in outcome_names + ] + return torch.cat(outcome_column_tensors, dim=1) diff --git a/isaaclab_arena/analysis/sensitivity/generate_report.py b/isaaclab_arena/analysis/sensitivity/generate_report.py new file mode 100644 index 0000000000..a746ceb3a2 --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/generate_report.py @@ -0,0 +1,117 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import argparse +import matplotlib.pyplot as plt +import torch +from pathlib import Path + +from isaaclab_arena.analysis.sensitivity.analyzer import SensitivityAnalyzer +from isaaclab_arena.analysis.sensitivity.dataset import SensitivityDataset +from isaaclab_arena.analysis.sensitivity.plotting import plot_marginals + + +def generate_report( + factors_yaml_path: str | Path, + jsonl_path: str | Path, + output_path: str | Path, + outcome_names: list[str] | tuple[str, ...] = ("success",), + observation: list[float] | None = None, + seed: int | None = 0, +) -> Path: + """Build a sensitivity report from a factors.yaml / episode_summary.jsonl pair. + + Loads the data, fits a SensitivityAnalyzer, and saves a single posterior-marginals + figure. The output format follows the output_path extension (.png, .pdf, …). + + Args: + factors_yaml_path: Schema file declaring the factors. + jsonl_path: episode_summary.jsonl produced by eval_runner. + output_path: Destination figure file (parent dirs created if absent). + outcome_names: Which per-episode outcome(s) to condition on. + observation: Outcome values to condition on, one per outcome name. Defaults to + conditioning on success (1) for every (binary) outcome. + seed: Seed for torch's global RNG, set once before fitting so the estimator training + and posterior sampling are reproducible. Pass ``None`` to leave the RNG untouched. + + Returns: + The resolved output path. + """ + # Estimator training (fit) and posterior sampling both draw from torch's global RNG in + # sequence, so seeding once here makes the whole report reproducible. + if seed is not None: + torch.manual_seed(seed) + + dataset = SensitivityDataset.from_files(Path(factors_yaml_path), Path(jsonl_path), outcome_names) + analyzer = SensitivityAnalyzer(dataset) + analyzer.fit() + + observation_tensor = ( + dataset.default_observation() if observation is None else torch.tensor(observation, dtype=torch.float32) + ) + samples = analyzer.sample_posterior(observation_tensor) + output_path = Path(output_path) + plot_marginals(samples, dataset, observation_tensor, output_path=str(output_path)) + plt.close("all") + print(f"[INFO] Wrote report → {output_path}") + return output_path + + +def main(): + parser = argparse.ArgumentParser( + description=( + "Build a sensitivity report (one posterior-marginal panel per factor) from a " + "(factors.yaml, episode_summary.jsonl) pair. Output format follows the --output extension." + ) + ) + parser.add_argument("--factors_yaml", type=str, required=True, help="Path to factors.yaml.") + parser.add_argument( + "--episode_summary", type=str, required=True, help="Path to episode_summary.jsonl produced by eval_runner." + ) + parser.add_argument( + "--output", + type=str, + default="eval/sensitivity_report.png", + help="Output figure file; format follows the extension (.png, .pdf, …). Default: eval/sensitivity_report.png.", + ) + parser.add_argument( + "--outcome", + type=str, + nargs="+", + default=["success"], + help="Which per-episode outcome(s) to condition on (keys in the rows' outcomes block). Default: success.", + ) + parser.add_argument( + "--observation", + type=float, + nargs="+", + default=None, + help=( + "Outcome values to condition on, one per --outcome (in order). " + "Outcomes are binary, so use 1 for success or 0 for failure. Defaults to 1 (success)." + ), + ) + parser.add_argument( + "--seed", + type=int, + default=0, + help="Seed for torch's global RNG, so estimator training + sampling are reproducible. Default: 0.", + ) + args = parser.parse_args() + + generate_report( + args.factors_yaml, + args.episode_summary, + args.output, + outcome_names=args.outcome, + observation=args.observation, + seed=args.seed, + ) + + +if __name__ == "__main__": + main() diff --git a/isaaclab_arena/analysis/sensitivity/plotting.py b/isaaclab_arena/analysis/sensitivity/plotting.py new file mode 100644 index 0000000000..73a4961e7b --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/plotting.py @@ -0,0 +1,122 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import math +import matplotlib.pyplot as plt +import numpy as np +from pathlib import Path +from scipy.stats import gaussian_kde +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import torch + + from isaaclab_arena.analysis.sensitivity.dataset import FactorSpec, SensitivityDataset + +_CONTINUOUS_COLOR = "steelblue" +_CATEGORICAL_COLOR = "steelblue" +_MEAN_COLOR = "firebrick" + + +def plot_marginals( + samples: torch.Tensor, + dataset: SensitivityDataset, + observation: torch.Tensor, + output_path: str | None = None, +): + """Plot the posterior marginal of every factor in a single figure. + + A pure renderer: it draws already-sampled posterior draws and does not run inference. + One panel per factor — a density curve for continuous factors, a probability bar chart + for categorical ones, wrapped into a grid. + + Args: + samples: ``(num_samples, total_factor_dim)`` posterior draws in the dataset's factor + layout (continuous-first, original units), e.g. from ``SensitivityAnalyzer.sample_posterior``. + dataset: The dataset, for the factor schema and column layout. + observation: The outcome vector the samples were conditioned on (shown in the title). + output_path: If given, save the figure here. The format follows the path's + extension (.png, .pdf, …); parent directories are created. + + Returns: + The matplotlib Figure. + """ + samples = samples.cpu().numpy() + factors = dataset.schema.factors + # Wrap panels into a grid (at most 3 columns) so many factors stay readable. + num_columns = min(3, len(factors)) + num_rows = math.ceil(len(factors) / num_columns) + figure, axes = plt.subplots(num_rows, num_columns, figsize=(6.0 * num_columns, 4.5 * num_rows), squeeze=False) + flat_axes = axes.flatten() + for axis_index, factor in enumerate(factors): + ax = flat_axes[axis_index] + factor_samples = samples[:, dataset.factor_columns[factor.name]].squeeze(-1) + if factor.type == "continuous": + _draw_continuous_marginal(ax, factor, factor_samples) + else: + _draw_categorical_marginal(ax, factor, factor_samples) + ax.set_title(factor.name, fontsize=11) + for unused_index in range(len(factors), len(flat_axes)): + flat_axes[unused_index].axis("off") + + observation_label = ", ".join( + f"{name}={value:g}" for name, value in zip(dataset.outcome_names, observation.tolist()) + ) + figure.suptitle( + f"Posterior marginals — {dataset.num_episodes} episodes (observed: {observation_label})", + fontsize=12, + fontweight="bold", + ) + figure.tight_layout(rect=[0, 0, 1, 0.95]) + + if output_path is not None: + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + figure.savefig(output_path, dpi=150, bbox_inches="tight") + return figure + + +def _draw_continuous_marginal(ax, factor: FactorSpec, factor_samples: np.ndarray) -> None: + """Smooth posterior density (filled KDE curve) of a continuous factor, with a mean line. + + A KDE line over the posterior samples reads the shape of a continuous posterior better + than a binned histogram. Falls back to a single line at the mean when the samples have + no spread (KDE bandwidth is then undefined). + """ + range_low, range_high = factor.range[0] + sample_mean = float(np.mean(factor_samples)) + if float(np.std(factor_samples)) >= 1e-9: + grid = np.linspace(range_low, range_high, 200) + density = gaussian_kde(factor_samples)(grid) + ax.plot(grid, density, color=_CONTINUOUS_COLOR, linewidth=2) + ax.fill_between(grid, 0, density, color=_CONTINUOUS_COLOR, alpha=0.2) + ax.set_ylim(bottom=0) + ax.axvline(sample_mean, color=_MEAN_COLOR, linestyle="--", linewidth=2, label=f"mean = {sample_mean:.3g}") + ax.set_xlim(range_low, range_high) + ax.set_xlabel(factor.name) + ax.set_ylabel("posterior density") + ax.legend(loc="best", fontsize=9) + ax.grid(alpha=0.3) + + +def _draw_categorical_marginal(ax, factor: FactorSpec, factor_samples: np.ndarray) -> None: + """Bar chart of a categorical factor's posterior probability per choice. + + sbi returns categorical columns as floats over the integer-code support, so samples are + rounded to the nearest code in [0, num_choices - 1] and tallied into frequencies. + """ + assert factor.choices is not None + num_choices = len(factor.choices) + codes = np.clip(np.round(factor_samples), 0, num_choices - 1).astype(int) + probabilities = np.bincount(codes, minlength=num_choices) / len(codes) + + ax.bar(range(num_choices), probabilities, color=_CATEGORICAL_COLOR, alpha=0.8) + ax.set_xticks(range(num_choices)) + ax.set_xticklabels(factor.choices, rotation=30, ha="right") + ax.set_xlabel(factor.name) + ax.set_ylabel("posterior probability") + ax.set_ylim(0, 1.05) + ax.grid(alpha=0.3, axis="y") diff --git a/isaaclab_arena/tests/sensitivity_synthetic.py b/isaaclab_arena/tests/sensitivity_synthetic.py new file mode 100644 index 0000000000..056b6ef50f --- /dev/null +++ b/isaaclab_arena/tests/sensitivity_synthetic.py @@ -0,0 +1,196 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Synthetic sensitivity datasets with a *known* ground-truth relationship. + +A simple forward simulator: it samples factors from a uniform prior, runs them through a +fixed generative model, and returns a SensitivityDataset of in-memory theta / x tensors — +no factors.yaml or episode_summary.jsonl round-trip. Because the planted relationship is +known, a test can fit a SensitivityAnalyzer on the data and assert the recovered posterior +reflects it. + +Ground truth (single-sourced in the factor definitions below): + - light_intensity is continuous; brighter raises success (LIGHT.weight > 0). + - grasp_offset is continuous; a *smaller* offset raises success (GRASP_OFFSET.weight < 0). + - table_material is categorical; MATERIAL makes oak the most successful, bamboo the least. + - success is a binary outcome drawn from Bernoulli(sigmoid(logit)). + +make_mixed_dataset exercises the MNPE path (continuous + categorical); make_continuous_dataset +exercises the NPE path with two continuous factors (NPE restricts to a Gaussian on 1-D theta). +""" + +from __future__ import annotations + +import argparse +import torch +from dataclasses import dataclass + +from isaaclab_arena.analysis.sensitivity.analyzer import SensitivityAnalyzer +from isaaclab_arena.analysis.sensitivity.dataset import FactorSchema, FactorSpec, SensitivityDataset +from isaaclab_arena.analysis.sensitivity.plotting import plot_marginals + + +@dataclass(frozen=True) +class _ContinuousFactor: + """A continuous factor with a planted, signed effect on the success logit.""" + + name: str + value_range: tuple[float, float] + weight: float # success-logit gain per normalized unit; the sign sets the direction of the effect + + def sample(self, num_episodes: int) -> torch.Tensor: + low, high = self.value_range + return torch.rand(num_episodes) * (high - low) + low + + def logit(self, values: torch.Tensor) -> torch.Tensor: + low, high = self.value_range + normalized = (values - 0.5 * (low + high)) / (0.5 * (high - low)) # map value_range onto [-1, 1] + return self.weight * normalized + + def spec(self) -> FactorSpec: + return FactorSpec(name=self.name, type="continuous", range=[list(self.value_range)]) + + def column(self, values: torch.Tensor) -> torch.Tensor: + return values + + +@dataclass(frozen=True) +class _CategoricalFactor: + """A categorical factor with a per-choice base success logit (ordered best→worst).""" + + name: str + base_logit: dict[str, float] + + @property + def choices(self) -> list[str]: + return list(self.base_logit) + + def sample(self, num_episodes: int) -> torch.Tensor: + return torch.randint(0, len(self.base_logit), (num_episodes,)) + + def logit(self, codes: torch.Tensor) -> torch.Tensor: + return torch.tensor([self.base_logit[choice] for choice in self.choices])[codes] + + def spec(self) -> FactorSpec: + return FactorSpec(name=self.name, type="categorical", choices=self.choices) + + def column(self, codes: torch.Tensor) -> torch.Tensor: + return codes.float() + + +# Planted ground truth: brighter light, a smaller grasp offset, a lighter object, a closer +# camera, and the leading category (oak / cube) all raise success. +LIGHT = _ContinuousFactor("light_intensity", (0.0, 5000.0), weight=2.5) +GRASP_OFFSET = _ContinuousFactor("grasp_offset", (0.0, 0.2), weight=-2.5) +OBJECT_MASS = _ContinuousFactor("object_mass", (0.05, 2.0), weight=-1.5) +CAMERA_DISTANCE = _ContinuousFactor("camera_distance", (0.3, 1.5), weight=-1.5) +MATERIAL = _CategoricalFactor("table_material", {"oak": 1.5, "walnut": 0.0, "bamboo": -1.5}) +OBJECT_TYPE = _CategoricalFactor("object_type", {"cube": 1.2, "can": 0.0, "mug": -1.2}) + + +def _sample_success(success_logit: torch.Tensor) -> torch.Tensor: + """Draw a binary success outcome per episode from Bernoulli(sigmoid(logit)).""" + return torch.bernoulli(torch.sigmoid(success_logit)) + + +def _build_dataset( + factors_and_columns: list[tuple[_ContinuousFactor | _CategoricalFactor, torch.Tensor]], + success: torch.Tensor, +) -> SensitivityDataset: + """Assemble a SensitivityDataset from (factor, sampled column) pairs and the success outcome. + + Continuous factors are placed before the categorical ones, matching the layout + SensitivityDataset.factor_columns expects. + """ + ordered = sorted(factors_and_columns, key=lambda pair: isinstance(pair[0], _CategoricalFactor)) + schema = FactorSchema(factors=[factor.spec() for factor, _ in ordered]) + theta = torch.stack([factor.column(values) for factor, values in ordered], dim=1) + # outcome_names defaults to ("success",), matching the single binary outcome built here. + return SensitivityDataset(schema, theta, success.unsqueeze(1)) + + +def make_continuous_dataset(seed: int, num_episodes: int = 2000) -> SensitivityDataset: + """Two continuous factors (light_intensity, grasp_offset) driving success. + + Uses the NPE path. Both effects are planted — brighter light and a smaller grasp offset + raise success — so conditioning the posterior on success should favor high light values + and low offset values. Two factors keep theta 2-D, away from NPE's 1-D Gaussian fallback. + """ + torch.manual_seed(seed) + light = LIGHT.sample(num_episodes) + grasp_offset = GRASP_OFFSET.sample(num_episodes) + success = _sample_success(LIGHT.logit(light) + GRASP_OFFSET.logit(grasp_offset)) + return _build_dataset([(LIGHT, light), (GRASP_OFFSET, grasp_offset)], success) + + +def make_mixed_dataset(seed: int, num_episodes: int = 3000) -> SensitivityDataset: + """Mixed continuous + categorical factors driving success (MNPE path). + + A realistic multi-factor sweep: three continuous factors on different scales (light, + mass, camera distance) and two categoricals (object type, table material). Every effect + is planted (brighter / lighter / closer / cube / oak raise success), so the posterior + conditioned on success should recover all of them at once. + """ + torch.manual_seed(seed) + light = LIGHT.sample(num_episodes) + object_mass = OBJECT_MASS.sample(num_episodes) + camera_distance = CAMERA_DISTANCE.sample(num_episodes) + object_type = OBJECT_TYPE.sample(num_episodes) + material = MATERIAL.sample(num_episodes) + success = _sample_success( + LIGHT.logit(light) + + OBJECT_MASS.logit(object_mass) + + CAMERA_DISTANCE.logit(camera_distance) + + OBJECT_TYPE.logit(object_type) + + MATERIAL.logit(material) + ) + return _build_dataset( + [ + (LIGHT, light), + (OBJECT_MASS, object_mass), + (CAMERA_DISTANCE, camera_distance), + (OBJECT_TYPE, object_type), + (MATERIAL, material), + ], + success, + ) + + +def _demo(): + """Run the full pipeline on a synthetic dataset and save the marginals plot. + + Runs the pipeline end to end on generated data: simulate → fit → plot, with no eval + data needed. Run as:: + + python -m isaaclab_arena.tests.sensitivity_synthetic --kind mixed --output eval/demo.png + """ + parser = argparse.ArgumentParser(description="Run the sensitivity pipeline on a synthetic dataset and plot it.") + parser.add_argument( + "--kind", + choices=["mixed", "continuous"], + default="mixed", + help="'mixed' (continuous + categorical, MNPE) or 'continuous' (continuous-only, NPE).", + ) + parser.add_argument( + "--output", + default="eval/sensitivity_synthetic.png", + help="Output figure path; format follows the extension.", + ) + parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--num-episodes", type=int, default=2000) + args = parser.parse_args() + + builder = {"mixed": make_mixed_dataset, "continuous": make_continuous_dataset}[args.kind] + dataset = builder(seed=args.seed, num_episodes=args.num_episodes) + analyzer = SensitivityAnalyzer(dataset) + analyzer.fit() + observation = dataset.default_observation() + samples = analyzer.sample_posterior(observation) + plot_marginals(samples, dataset, observation, output_path=args.output) + print(f"[INFO] Wrote synthetic sensitivity report → {args.output}") + + +if __name__ == "__main__": + _demo() diff --git a/isaaclab_arena/tests/test_sensitivity_analysis.py b/isaaclab_arena/tests/test_sensitivity_analysis.py new file mode 100644 index 0000000000..cf6d50a799 --- /dev/null +++ b/isaaclab_arena/tests/test_sensitivity_analysis.py @@ -0,0 +1,152 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""End-to-end sensitivity-analysis tests on synthetic data with a known ground truth. + +Each test fits a SensitivityAnalyzer on a dataset whose factor→outcome relationships are +planted by the synthetic module (brighter / lighter / closer / cube / oak raise success), +then asserts the posterior conditioned on success recovers them. The data is built in +memory, so these run on CPU without Isaac Sim. They cover both estimator paths: MNPE for a +mixed schema, NPE for a continuous-only one (2-D theta). +""" + +from __future__ import annotations + +import json +import numpy as np +import torch + +from isaaclab_arena.analysis.sensitivity.analyzer import SensitivityAnalyzer +from isaaclab_arena.analysis.sensitivity.dataset import SensitivityDataset +from isaaclab_arena.tests.sensitivity_synthetic import ( + CAMERA_DISTANCE, + GRASP_OFFSET, + LIGHT, + MATERIAL, + OBJECT_MASS, + OBJECT_TYPE, + make_continuous_dataset, + make_mixed_dataset, +) + +_NUM_SAMPLES = 5000 + + +def _factor_samples(analyzer: SensitivityAnalyzer, samples: torch.Tensor, factor_name: str) -> np.ndarray: + """Pull one factor's column out of a posterior-sample tensor as a 1-D numpy array.""" + return samples[:, analyzer.dataset.factor_columns[factor_name]].squeeze(-1).cpu().numpy() + + +def _midpoint(factor) -> float: + """Midpoint of a continuous factor's range — the threshold a recovered mean should beat.""" + low, high = factor.value_range + return 0.5 * (low + high) + + +def _most_likely_choice(analyzer, samples, factor_name: str, choices: list[str]) -> str: + """The categorical choice the posterior favors (mode over rounded integer-coded samples).""" + codes = np.clip(np.round(_factor_samples(analyzer, samples, factor_name)), 0, len(choices) - 1).astype(int) + probabilities = np.bincount(codes, minlength=len(choices)) / len(codes) + return choices[int(probabilities.argmax())] + + +def test_mnpe_recovers_all_planted_effects(): + """Mixed continuous + categorical (MNPE): recover every planted effect at once.""" + dataset = make_mixed_dataset(seed=0) + analyzer = SensitivityAnalyzer(dataset) + assert analyzer._select_inference_class().__name__ == "MNPE", "mixed schema should select MNPE" + + torch.manual_seed(0) + analyzer.fit() + samples = analyzer.sample_posterior(num_samples=_NUM_SAMPLES) # conditions on success=1 by default + + # Continuous effects: brighter light, a lighter object, and a closer camera raise success. + assert _factor_samples(analyzer, samples, "light_intensity").mean() > _midpoint(LIGHT) + assert _factor_samples(analyzer, samples, "object_mass").mean() < _midpoint(OBJECT_MASS) + assert _factor_samples(analyzer, samples, "camera_distance").mean() < _midpoint(CAMERA_DISTANCE) + + # Categorical effects: cube and oak are the planted best choices. + assert _most_likely_choice(analyzer, samples, "object_type", OBJECT_TYPE.choices) == "cube" + assert _most_likely_choice(analyzer, samples, "table_material", MATERIAL.choices) == "oak" + + +def test_npe_recovers_two_continuous_effects(): + """Two continuous factors (NPE): recover that bright light and a small grasp offset drive success.""" + dataset = make_continuous_dataset(seed=0) + analyzer = SensitivityAnalyzer(dataset) + assert analyzer._select_inference_class().__name__.startswith("NPE"), "continuous-only schema should select NPE" + + torch.manual_seed(0) + analyzer.fit() + samples = analyzer.sample_posterior(num_samples=_NUM_SAMPLES) # conditions on success=1 by default + + # Brighter light raises success → light posterior skews high. + assert _factor_samples(analyzer, samples, "light_intensity").mean() > _midpoint(LIGHT) + # A smaller grasp offset raises success → offset posterior skews low. + assert _factor_samples(analyzer, samples, "grasp_offset").mean() < _midpoint(GRASP_OFFSET) + + +def _write_jsonl(path, rows: list[dict]) -> None: + """Write one JSON object per line to ``path``.""" + path.write_text("\n".join(json.dumps(row) for row in rows) + "\n", encoding="utf-8") + + +def test_from_files_parses_mixed_schema_and_builds_tensors(tmp_path): + """from_files parses a factors.yaml + episode_summary.jsonl into the expected theta / x layout.""" + factors_yaml = tmp_path / "factors.yaml" + factors_yaml.write_text( + "factors:\n" + " light_intensity:\n" + " type: continuous\n" + " range: [[0.0, 1000.0]]\n" + " pick_up_object:\n" + " type: categorical\n" + " choices: [cube, can]\n", + encoding="utf-8", + ) + jsonl = tmp_path / "episode_summary.jsonl" + _write_jsonl( + jsonl, + [ + {"arena_env_args": {"light_intensity": 250.0, "pick_up_object": "cube"}, "outcomes": {"success": 1}}, + {"arena_env_args": {"light_intensity": 750.0, "pick_up_object": "can"}, "outcomes": {"success": 0}}, + {"arena_env_args": {"light_intensity": 500.0, "pick_up_object": "cube"}, "outcomes": {"success": 1}}, + ], + ) + + dataset = SensitivityDataset.from_files(factors_yaml, jsonl, outcome_names=["success"]) + + # Schema parsed with the declared structure. + factors_by_name = {factor.name: factor for factor in dataset.schema.factors} + assert factors_by_name["light_intensity"].type == "continuous" + assert factors_by_name["light_intensity"].range == [(0.0, 1000.0)] + assert factors_by_name["pick_up_object"].type == "categorical" + assert factors_by_name["pick_up_object"].choices == ["cube", "can"] + + # Continuous-first theta layout; categorical integer-coded by its index into choices. + assert dataset.theta.shape == (3, 2) + assert dataset.x.shape == (3, 1) + assert dataset.factor_columns == {"light_intensity": slice(0, 1), "pick_up_object": slice(1, 2)} + assert dataset.theta[:, 0].tolist() == [250.0, 750.0, 500.0] + assert dataset.theta[:, 1].tolist() == [0.0, 1.0, 0.0] # cube -> 0, can -> 1 + assert dataset.x[:, 0].tolist() == [1.0, 0.0, 1.0] + + +def test_from_files_infers_missing_continuous_range(tmp_path): + """A continuous factor with no declared range gets [min, max] inferred from the observed values.""" + factors_yaml = tmp_path / "factors.yaml" + factors_yaml.write_text("factors:\n light_intensity:\n type: continuous\n", encoding="utf-8") + jsonl = tmp_path / "episode_summary.jsonl" + _write_jsonl( + jsonl, + [ + {"arena_env_args": {"light_intensity": 30.0}, "outcomes": {"success": 0}}, + {"arena_env_args": {"light_intensity": 90.0}, "outcomes": {"success": 1}}, + ], + ) + + dataset = SensitivityDataset.from_files(factors_yaml, jsonl, outcome_names=["success"]) + + assert dataset.schema.factors[0].range == [(30.0, 90.0)] diff --git a/setup.py b/setup.py index fc4f4e6eaa..582f669ec2 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,10 @@ "pytest", "pydantic>=2.0", "openai>=2.0", + # Sensitivity analysis (isaaclab_arena.analysis.sensitivity), imported at module level. + "sbi", + "scipy", + "matplotlib", ] DEV_DEPS = [