From a2405af736b30b07cf1b4542f5533889fc6f35c5 Mon Sep 17 00:00:00 2001 From: Clemens Volk Date: Fri, 12 Jun 2026 15:30:25 +0200 Subject: [PATCH 1/2] Record realized variation draws and a sensitivity factor schema Build-time variations now stash their sampled value (VariationBase.last_draw), and the builder exposes two collectors keyed by .: - get_enabled_variation_factor_schema(): the prior (type + range) per enabled variation, derived from its sampler (Uniform -> continuous, Choice -> categorical) - get_enabled_variation_draws(): this build's realized values These feed the sensitivity episode-summary writer so variation-sampled factors are recorded the same way static CLI factors already are. Run-time (per-reset) draws are not captured yet (would need a per-env buffer). Signed-off-by: Clemens Volk --- .../environments/arena_env_builder.py | 17 +++ isaaclab_arena/evaluation/episode_writer.py | 107 ++++++++++++++++++ .../variations/hdr_image_variation.py | 1 + .../variations/light_intensity_variation.py | 1 + isaaclab_arena/variations/variation_base.py | 16 +++ .../variations/variation_recording.py | 87 ++++++++++++++ 6 files changed, 229 insertions(+) create mode 100644 isaaclab_arena/evaluation/episode_writer.py create mode 100644 isaaclab_arena/variations/variation_recording.py diff --git a/isaaclab_arena/environments/arena_env_builder.py b/isaaclab_arena/environments/arena_env_builder.py index 9cee83ba7..332b709a5 100644 --- a/isaaclab_arena/environments/arena_env_builder.py +++ b/isaaclab_arena/environments/arena_env_builder.py @@ -36,6 +36,7 @@ from isaaclab_arena.utils.multiprocess import get_local_rank from isaaclab_arena.variations import variations_hydra, variations_printing from isaaclab_arena.variations.variation_base import BuildTimeVariationBase, RunTimeVariationBase, VariationBase +from isaaclab_arena.variations.variation_recording import collect_variation_draws, collect_variation_factor_schema class ArenaEnvBuilder: @@ -98,6 +99,22 @@ def get_variations_catalogue_as_string(self) -> str: variations: dict[str, list[VariationBase]] = self.get_all_variations() return variations_printing.get_variations_catalogue_as_string(variations, hydra_overrides=self.hydra_overrides) + def get_enabled_variation_factor_schema(self) -> dict[str, dict]: + """Return the sensitivity factor schema for every enabled variation (``.``). + + Run-level metadata: which variations were varied and the prior (type + range) each samples + from. Logged once into the episode-summary header so the analysis is self-describing. + """ + return collect_variation_factor_schema(self.get_all_variations()) + + def get_enabled_variation_draws(self) -> dict[str, object]: + """Return ``{.: realized_value}`` for this build's enabled variations. + + Build-time variations sample once per build, so the returned draws are fixed for the + lifetime of the env this builder produced. Call after the env is built. + """ + return collect_variation_draws(self.get_all_variations()) + def _compose_variations_event_cfg(self) -> Any | None: """Build a configclass with one :class:`EventTermCfg` per enabled run-time variation. diff --git a/isaaclab_arena/evaluation/episode_writer.py b/isaaclab_arena/evaluation/episode_writer.py new file mode 100644 index 000000000..df88f5e52 --- /dev/null +++ b/isaaclab_arena/evaluation/episode_writer.py @@ -0,0 +1,107 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import json +from pathlib import Path +from typing import TYPE_CHECKING + +from isaaclab_arena.metrics.metrics_logger import metrics_to_plain_python_types + +if TYPE_CHECKING: + from isaaclab_arena.evaluation.job_manager import Job + + +# Bumped when the on-disk record shape changes so readers can branch on it. +SCHEMA_VERSION = 1 + + +def write_episode_summaries(env, job: Job, output_path: str | Path) -> int: + """Append one JSONL row per recorded episode for the just-completed job. + + The file is self-describing: its first line is a ``"record": "meta"`` header (written once, + by whichever job reaches an empty/absent file) carrying the run-level slice identity, the + enabled-variation factor schema, and the outcome names. Every later line is an episode:: + + { + "record": "episode", + "job_name": "", + "episode_idx": , + "arena_env_args": , + "variation_draws": . values this build sampled>, + "outcomes": + } + + Per-episode metric values come from the env's ``MetricsManager`` (the same machinery that + backs ``compute_metrics``), so all HDF5/metric access stays in the metrics layer. The + variation factor schema and draws are read off the env where ``load_env`` attached them. + + Args: + env: The (possibly gym-wrapped) Arena env that just finished its rollout. Its + ``MetricsManager`` provides the per-episode metric values. + job: The Job that ran. Its ``arena_env_args_dict`` is logged verbatim under + ``arena_env_args``. + output_path: JSONL file to append to. Created (with parent dirs) if absent. + + Returns: + Number of episode rows written. + """ + unwrapped_env = env.unwrapped + if not hasattr(unwrapped_env.cfg, "metrics") or unwrapped_env.cfg.metrics is None: + return 0 + + per_episode_metrics = unwrapped_env.metrics_manager.compute_per_episode() + arena_env_args_snapshot = dict(job.arena_env_args_dict) + # Build-time draws are fixed for this env, so the same dict applies to every episode it ran. + variation_draws = dict(getattr(unwrapped_env, "arena_variation_draws", {}) or {}) + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + _write_run_header_if_new(output_path, unwrapped_env, job) + + with open(output_path, "a", encoding="utf-8") as jsonl_output: + for episode_idx, episode_metrics in enumerate(per_episode_metrics): + summary_row = { + "record": "episode", + "job_name": job.name, + "episode_idx": episode_idx, + "arena_env_args": arena_env_args_snapshot, + "variation_draws": variation_draws, + "outcomes": metrics_to_plain_python_types(episode_metrics), + } + jsonl_output.write(json.dumps(summary_row) + "\n") + + return len(per_episode_metrics) + + +def _write_run_header_if_new(output_path: Path, unwrapped_env, job: Job) -> None: + """Write the one-time ``"record": "meta"`` header iff ``output_path`` is empty/absent. + + Made idempotent by the empty-file check so it fires exactly once per run — including across + the per-chunk subprocesses that append to a shared file (chunks run sequentially, so the + first writes the header and the rest see a non-empty file and skip). + """ + if output_path.exists() and output_path.stat().st_size > 0: + return + header = { + "record": "meta", + "schema_version": SCHEMA_VERSION, + "slice": _slice_identity(job), + "factors": dict(getattr(unwrapped_env, "arena_variation_factor_schema", {}) or {}), + "outcome_names": list(unwrapped_env.metrics_manager.active_terms), + } + with open(output_path, "a", encoding="utf-8") as jsonl_output: + jsonl_output.write(json.dumps(header) + "\n") + + +def _slice_identity(job: Job) -> dict: + """The (policy, task, embodiment) tuple this run analyses — MNPE assumes a single source.""" + arena_env_args = job.arena_env_args_dict + return { + "environment": arena_env_args.get("environment"), + "embodiment": arena_env_args.get("embodiment"), + "policy_type": job.policy_type, + } diff --git a/isaaclab_arena/variations/hdr_image_variation.py b/isaaclab_arena/variations/hdr_image_variation.py index 692e33c5e..14940198c 100644 --- a/isaaclab_arena/variations/hdr_image_variation.py +++ b/isaaclab_arena/variations/hdr_image_variation.py @@ -76,5 +76,6 @@ def apply(self) -> None: assert self.sampler is not None, "HDRImageVariation: sampler not set." # Pass HDR names as the choice sampler's choices. hdr_name = self.sampler.sample(num_samples=1, choices=hdr_names)[0] + self._record_draw(hdr_name) hdr_cls: type[HDRImage] = registry.get_hdr_by_name(hdr_name) self._light.add_hdr(hdr_cls()) diff --git a/isaaclab_arena/variations/light_intensity_variation.py b/isaaclab_arena/variations/light_intensity_variation.py index ff4c0f4ec..c9713f5fd 100644 --- a/isaaclab_arena/variations/light_intensity_variation.py +++ b/isaaclab_arena/variations/light_intensity_variation.py @@ -48,4 +48,5 @@ def __init__( def apply(self) -> None: assert self.sampler is not None, "LightIntensityVariation: sampler not set." intensity = float(self.sampler.sample(num_samples=1)[0, 0]) + self._record_draw(intensity) self._light.set_intensity(intensity) diff --git a/isaaclab_arena/variations/variation_base.py b/isaaclab_arena/variations/variation_base.py index 21542a02c..86339c880 100644 --- a/isaaclab_arena/variations/variation_base.py +++ b/isaaclab_arena/variations/variation_base.py @@ -55,6 +55,7 @@ class VariationBase(ABC): def __init__(self, cfg: VariationBaseCfg, name: str): self.name = name self._sampler: SamplerBase | None = None + self._last_draw: object | None = None self.apply_cfg(cfg) @property @@ -62,6 +63,21 @@ def enabled(self) -> bool: """Whether this variation is active and should be built into ``events_cfg``.""" return self.cfg.enabled + @property + def last_draw(self) -> object | None: + """The value most recently sampled by this variation, or ``None`` if never sampled. + + Set by subclasses via :meth:`_record_draw` when they realise a sample, so eval recording + can log the realized factor value per build/episode. Build-time variations populate it once + per env build; run-time variations do not record here yet (their per-reset draws would need + a per-env buffer — tracked as a follow-up). + """ + return self._last_draw + + def _record_draw(self, value: object) -> None: + """Store ``value`` as the realized sample of this variation (see :attr:`last_draw`).""" + self._last_draw = value + def enable(self) -> None: """Mark this variation as active.""" self.cfg.enabled = True diff --git a/isaaclab_arena/variations/variation_recording.py b/isaaclab_arena/variations/variation_recording.py new file mode 100644 index 000000000..cdb098d18 --- /dev/null +++ b/isaaclab_arena/variations/variation_recording.py @@ -0,0 +1,87 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Turn an environment's enabled variations into sensitivity-analysis inputs. + +Two views are derived, both keyed by the dotted ``.`` path: + +* :func:`collect_variation_factor_schema` — the run-level *prior* (type + range/choices) + each enabled variation samples from. Emitted once per run into the episode-summary header. +* :func:`collect_variation_draws` — the *realized* value each enabled variation last sampled. + Recorded per build (build-time variations) so the episode writer can log it per episode. + +Only build-time variations populate :attr:`~isaaclab_arena.variations.variation_base.VariationBase.last_draw` +today, so run-time variations contribute schema but no draws yet (see ``last_draw``). +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from isaaclab_arena.variations.choice_sampler import ChoiceSampler +from isaaclab_arena.variations.uniform_sampler import UniformSampler + +if TYPE_CHECKING: + from isaaclab_arena.variations.variation_base import VariationBase + + +def collect_variation_draws(variations: dict[str, list[VariationBase]]) -> dict[str, Any]: + """Return ``{.: realized_value}`` for every enabled variation that sampled. + + Args: + variations: ``{asset_name: [variation, ...]}`` as returned by the builder. + + Returns: + One entry per enabled variation whose ``last_draw`` is set. Values are JSON primitives + (float for continuous, str for categorical) so they drop straight into the JSONL. + """ + draws: dict[str, Any] = {} + for asset_name, asset_variations in variations.items(): + for variation in asset_variations: + if not variation.enabled or variation.last_draw is None: + continue + draws[f"{asset_name}.{variation.name}"] = variation.last_draw + return draws + + +def collect_variation_factor_schema(variations: dict[str, list[VariationBase]]) -> dict[str, dict]: + """Return ``{.: factor_spec}`` for every enabled, recognised variation. + + Each ``factor_spec`` mirrors a ``factors.yaml`` entry: ``type`` plus ``range`` (continuous) + derived from the sampler. Categorical ``choices`` are left out — they are inferred from the + observed draws at analysis time, keeping this generic over the choice pool. + + Args: + variations: ``{asset_name: [variation, ...]}`` as returned by the builder. + + Returns: + One entry per enabled variation whose sampler family is recognised; others are skipped. + """ + schema: dict[str, dict] = {} + for asset_name, asset_variations in variations.items(): + for variation in asset_variations: + if not variation.enabled: + continue + factor_spec = _variation_factor_spec(variation) + if factor_spec is not None: + schema[f"{asset_name}.{variation.name}"] = factor_spec + return schema + + +def _variation_factor_spec(variation: VariationBase) -> dict | None: + """Derive a factors.yaml-style spec from a variation's sampler, or ``None`` if unrecognised.""" + sampler = variation.sampler + if isinstance(sampler, UniformSampler): + low = sampler.low.flatten().tolist() + high = sampler.high.flatten().tolist() + return { + "type": "continuous", + "range": [[dim_low, dim_high] for dim_low, dim_high in zip(low, high)], + "distribution": "uniform", + } + if isinstance(sampler, ChoiceSampler): + # Choices are inferred from the observed draws at load time (the pool isn't on the sampler). + return {"type": "categorical"} + return None From cb3d2172b7bf9aeea5b32cb69d3a75682748d1df Mon Sep 17 00:00:00 2001 From: Clemens Volk Date: Fri, 12 Jun 2026 15:30:35 +0200 Subject: [PATCH 2/2] Add per-episode sensitivity episode-summary recording Opt-in --episode_summary writes a self-describing JSONL for sensitivity analysis: - A one-time meta header (run slice + enabled-variation factor schema + outcome names), then one episode row each carrying arena_env_args, realized variation_draws, and outcomes. - episode_writer reads per-episode outcomes via MetricsManager.compute_per_episode and the factor schema / draws off the env (attached in load_env). Written per rebuild. - The write is guarded so a JSONL failure can't flip a succeeded job to FAILED. - metrics.get_recorded_metric_data sorts demos by numeric index so per-episode ordering is correct at >= 10 episodes (h5py iterates demo_0, demo_1, demo_10, ... alphabetically). - Job carries arena_env_args_dict (the pre-CLI dict) for logging. The analysis-side reader (SensitivityDataset.from_file) lands with the analyzer. Signed-off-by: Clemens Volk --- isaaclab_arena/evaluation/eval_runner.py | 26 ++++++++++++++++ isaaclab_arena/evaluation/eval_runner_cli.py | 10 ++++++ isaaclab_arena/evaluation/job_manager.py | 5 +++ isaaclab_arena/metrics/metrics.py | 4 ++- isaaclab_arena/metrics/metrics_manager.py | 32 +++++++++++++++++++- 5 files changed, 75 insertions(+), 2 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index c1512700d..3fa5a4a3e 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser +from isaaclab_arena.evaluation.episode_writer import write_episode_summaries from isaaclab_arena.evaluation.eval_runner_cli import add_eval_runner_arguments from isaaclab_arena.evaluation.job_manager import Job, JobManager, Status from isaaclab_arena.evaluation.policy_runner import get_policy_cls, rollout_policy @@ -51,6 +52,12 @@ def load_env( env_cfg.recorders.dataset_filename = f"dataset_{job_name}" env = arena_builder.make_registered(env_cfg, render_mode=render_mode) + + # Surface the enabled variations so the episode summary writer can record them: the factor + # schema (run-level prior) and this build's realized build-time draws (fixed for the env's life). + env.unwrapped.arena_variation_factor_schema = arena_builder.get_enabled_variation_factor_schema() + env.unwrapped.arena_variation_draws = arena_builder.get_enabled_variation_draws() + # Don't reset here - rollout_policy() will reset the env. Every reset triggers a new episode, initializing recorder & creating a new hdf5 entry. return env @@ -244,6 +251,15 @@ def main(): # Check if any job requires cameras and enable them if needed before starting simulation enable_cameras_if_required(eval_jobs_config, args_cli) + # --episode_summary (opt-in): the writer logs the full arena_env_args + realized variation + # draws per episode; the analyzer's factors / header decide which keys are factors. + episode_summary_enabled = args_cli.episode_summary is not None + if episode_summary_enabled: + print( + "[INFO] Episode summary recording enabled. Per-episode arena_env_args + variation_draws" + f" + outcomes → {args_cli.episode_summary}" + ) + with SimulationAppContext(args_cli): job_manager = JobManager(eval_jobs_config["jobs"]) metrics_logger = MetricsLogger() @@ -307,6 +323,16 @@ def main(): language_instruction=job.language_instruction, ) + if episode_summary_enabled: + # Opt-in instrumentation: a write failure here must not fail an + # otherwise-successful rebuild (which would discard its real metrics). + # Written per rebuild — each re-samples this build's variation draws. + try: + rows = write_episode_summaries(env, job, args_cli.episode_summary) + print(f"[INFO] Wrote {rows} episode summaries for job '{job.name}' (rebuild {rebuild_idx})") + except Exception as summary_error: + print(f"[WARNING] Failed to write episode summaries for job '{job.name}': {summary_error}") + job_manager.complete_job(job, metrics=metrics, status=Status.COMPLETED) # users may not specify metrics for a task, although it's not recommended diff --git a/isaaclab_arena/evaluation/eval_runner_cli.py b/isaaclab_arena/evaluation/eval_runner_cli.py index 15e6131b4..0bef90b80 100644 --- a/isaaclab_arena/evaluation/eval_runner_cli.py +++ b/isaaclab_arena/evaluation/eval_runner_cli.py @@ -38,3 +38,13 @@ def add_eval_runner_arguments(parser: argparse.ArgumentParser) -> None: " set only if a long sweep grows in host memory or gets OOM-killed." ), ) + parser.add_argument( + "--episode_summary", + type=str, + default=None, + help=( + "Opt-in: append one self-describing JSONL row per episode (arena_env_args, realized" + " variation_draws, and outcomes) to this path, for sensitivity analysis. The first" + " line is a run-level header (slice + factor schema). Unset disables recording." + ), + ) diff --git a/isaaclab_arena/evaluation/job_manager.py b/isaaclab_arena/evaluation/job_manager.py index e33839ab4..c05bc411d 100644 --- a/isaaclab_arena/evaluation/job_manager.py +++ b/isaaclab_arena/evaluation/job_manager.py @@ -31,6 +31,7 @@ def __init__( status: Status = None, language_instruction: str = None, variations: list[str] = None, + arena_env_args_dict: dict = None, ): """Initialize a Job instance. @@ -49,9 +50,12 @@ def __init__( takes precedence over the task's own description. variations: Hydra variation override strings (e.g. ``"cracker_box.color.enabled=true"``) applied when composing the environment cfg. Defaults to no overrides. + arena_env_args_dict: The original dict form of arena_env_args (before conversion to a + CLI list). Logged verbatim per episode by the sensitivity episode-summary writer. """ self.name = name self.arena_env_args = arena_env_args + self.arena_env_args_dict = arena_env_args_dict if arena_env_args_dict is not None else {} self.variations = variations if variations is not None else [] assert num_envs > 0, "num_envs must be greater than 0" assert not ( @@ -114,6 +118,7 @@ def from_dict(cls, data: dict) -> "Job": return cls( name=data["name"], arena_env_args=cls.convert_args_dict_to_cli_args_list(data["arena_env_args"]), + arena_env_args_dict=data["arena_env_args"], policy_type=data["policy_type"], num_envs=num_envs, num_steps=num_steps, diff --git a/isaaclab_arena/metrics/metrics.py b/isaaclab_arena/metrics/metrics.py index d93b30309..66eec02ad 100644 --- a/isaaclab_arena/metrics/metrics.py +++ b/isaaclab_arena/metrics/metrics.py @@ -27,7 +27,9 @@ def get_recorded_metric_data(dataset_path: pathlib.Path, recorder_term_name: str recorded_metric_data_per_demo: list[np.ndarray] = [] with h5py.File(dataset_path, "r") as f: demos = f["data"] - for demo in demos: + # h5py iterates group members alphabetically (demo_0, demo_1, demo_10, demo_2, ...), + # so sort by the numeric demo index to return episodes in recorded order. + for demo in sorted(demos, key=lambda name: int(name.split("_")[-1])): recorded_metric_data_per_demo.append(demos[demo][recorder_term_name][:]) return recorded_metric_data_per_demo diff --git a/isaaclab_arena/metrics/metrics_manager.py b/isaaclab_arena/metrics/metrics_manager.py index 6ee38e010..7e84a2ef5 100644 --- a/isaaclab_arena/metrics/metrics_manager.py +++ b/isaaclab_arena/metrics/metrics_manager.py @@ -5,7 +5,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from isaaclab_arena.metrics.metric_data import MetricData, MetricsDataCollection from isaaclab_arena.metrics.metric_term_cfg import MetricTermCfg @@ -68,3 +68,33 @@ def compute(self) -> MetricsDataCollection: num_episodes=get_num_episodes(dataset_path), metric_data_entries=metric_data_entries ) return metrics_data_collection + + def compute_per_episode(self) -> list[dict[str, Any]]: + """Compute every registered metric separately for each recorded episode. + + Where :meth:`compute` reduces across all episodes to one aggregate value per + metric, this returns one ``{metric_name: value}`` dict per episode — each metric's + compute func is fed that single episode's recorded array (a one-element list). + + Returns: + A list with one metric dict per episode, in recorded order. + """ + dataset_path = get_metric_recorder_dataset_path(self._env) + num_episodes = get_num_episodes(dataset_path) + + # Recorded data arrives grouped by metric (each term -> one array per episode). + # Read it once here, then transpose into one metric dict per episode below. + episode_arrays_by_term = { + term_name: get_recorded_metric_data(dataset_path, term_cfg.recorder_term_name) + for term_name, term_cfg in zip(self._term_names, self._term_cfgs) + } + + per_episode_metrics: list[dict[str, Any]] = [] + for episode_index in range(num_episodes): + episode_metrics: dict[str, Any] = {} + for term_name, term_cfg in zip(self._term_names, self._term_cfgs): + # compute_metric_func reduces a list of per-episode arrays; give it just this one. + episode_array = episode_arrays_by_term[term_name][episode_index] + episode_metrics[term_name] = term_cfg.compute_metric_func([episode_array], **term_cfg.params) + per_episode_metrics.append(episode_metrics) + return per_episode_metrics