Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ete_test_gpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ jobs:
unset HTTP_PROXY;unset HTTPS_PROXY;unset http_proxy;unset https_proxy;
CASE_NAME="${{ github.event.inputs.run_case || 'all' }}"
if [ "$CASE_NAME" = "all" ]; then
pytest autotest/test_all.py -m all -n 1 -vv --run_id ${{ github.run_id }}
pytest autotest/test_all.py -m all -n 1 -s -vv --run_id ${{ github.run_id }}
else
pytest autotest/test_all.py::test_all[$CASE_NAME] -m all -n 1 -vv --run_id ${{ github.run_id }}
pytest autotest/test_all.py::test_all[$CASE_NAME] -m all -n 1 -s -vv --run_id ${{ github.run_id }}
fi

- name: Check report files
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ete_test_npu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ jobs:
unset HTTP_PROXY;unset HTTPS_PROXY;unset http_proxy;unset https_proxy;
CASE_NAME="${{ github.event.inputs.run_case || 'all' }}"
if [ "$CASE_NAME" = "all" ]; then
export DEVICE=npu && pytest autotest/test_all.py -m all -n 1 -vv --run_id ${{ github.run_id }}
export DEVICE=npu && pytest autotest/test_all.py -m all -n 1 -s -vv --run_id ${{ github.run_id }}
else
export DEVICE=npu && pytest autotest/test_all.py::test_all[$CASE_NAME] -m all -n 1 -vv --run_id ${{ github.run_id }}
export DEVICE=npu && pytest autotest/test_all.py::test_all[$CASE_NAME] -m all -n 1 -s -vv --run_id ${{ github.run_id }}
fi

- name: Check report files
Expand Down
84 changes: 79 additions & 5 deletions autotest/cluster/clusterx.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import re
import time
import traceback
from typing import Any, Dict, Optional

from clusterx.config import CLUSTER
from clusterx.launcher import CLUSTER_MAPPING
from clusterx.launcher.base import JobStatus
from clusterx.launcher.base import JobSchema, JobStatus


JOB_LOOKUP_RETRY_INTERVAL_S = 5
JOB_LOOKUP_RETRY_TIMES = 6


class ClusterTaskExecutor:
Expand Down Expand Up @@ -36,9 +41,9 @@ def execute_task(self, task_config: Dict[str, Any]):

all_command.append(command)
run_command = "; ".join(all_command)
job_name = "-".join([task_config["type"], task_config["case_name"], task_config["run_id"]])

try:
job_name = "-".join([task_config["type"], task_config["case_name"], task_config["run_id"]])
params = self.params_cls(
job_name=job_name,
cmd=run_command,
Expand All @@ -50,13 +55,22 @@ def execute_task(self, task_config: Dict[str, Any]):
num_nodes=resource.get("num_nodes", 1),
image=resource.get("image", None),
no_env=resource.get("no_env", True),
image_pull_policy=resource.get("image_pull_policy","Always"),
image_pull_policy=resource.get("image_pull_policy", "Always"),
)

job_schema = self.cluster.run(params)
except Exception as e:
traceback.print_exc()
raise RuntimeError(f"clusterx job {job_name} start fail, task config is {task_config}, exception is: {e}")
job_schema = self._lookup_job_schema(job_name)
if job_schema is None:
raise RuntimeError(
f"clusterx job {job_name} start fail and lookup found no matching job, "
f"task config is {task_config}, exception is: {e}"
)
print(
f"clusterx job {job_name} submit error recovered via lookup: "
f"job_id={job_schema.job_id}, status={job_schema.status}, original exception: {e}"
)

start_time = time.time()
run_start_time = None
Expand All @@ -68,7 +82,7 @@ def execute_task(self, task_config: Dict[str, Any]):
if status in [JobStatus.SUCCEEDED]:
run_time = time.time() - run_start_time
if run_time >= timeout:
return False, f'Task succeeded, but run time is {run_time}, exceeding then {timeout}'
return False, f"Task succeeded, but run time is {run_time}, exceeding then {timeout}"
else:
return True, "Task succeeded"
elif status in [JobStatus.FAILED, JobStatus.STOPPED]:
Expand All @@ -91,6 +105,66 @@ def execute_task(self, task_config: Dict[str, Any]):
)
time.sleep(10)

@staticmethod
def _job_name_matches(candidate: str | None, job_name: str) -> bool:
if not candidate:
return False
return candidate == job_name or candidate.startswith(f"{job_name}-")

def _pick_latest_job(self, jobs: list[JobSchema]) -> JobSchema:
return max(jobs, key=lambda job: job.job_id or job.job_name or "")

def _lookup_job_schema_once(self, job_name: str) -> JobSchema | None:
try:
return self.cluster.get_job_info(job_name)
except Exception:
pass

name_regex = rf"^{re.escape(job_name)}(-.*)?$"
try:
jobs = self.cluster.list_jobs(regex=name_regex, num=50)
if jobs:
return self._pick_latest_job(jobs)
except Exception as e:
print(f"list_jobs lookup for {job_name} failed: {e}")

client = getattr(self.cluster, "client", None)
get_job_name = getattr(self.cluster, "_get_job_name", None)
if client is not None and get_job_name is not None:
try:
matched_names = [
get_job_name(job)
for job in (client.list() or [])
if self._job_name_matches(get_job_name(job), job_name)
]
if matched_names:
return self.cluster.get_job_info(max(matched_names))
except Exception as e:
print(f"brainpp client list lookup for {job_name} failed: {e}")

try:
jobs = self.cluster.list_jobs(num=100)
matched = [job for job in jobs if self._job_name_matches(job.job_id, job_name)]
if matched:
return self._pick_latest_job(matched)
except Exception as e:
print(f"generic list_jobs lookup for {job_name} failed: {e}")

return None

def _lookup_job_schema(self, job_name: str) -> JobSchema | None:
for attempt in range(1, JOB_LOOKUP_RETRY_TIMES + 1):
job_schema = self._lookup_job_schema_once(job_name)
if job_schema is not None:
return job_schema
if attempt < JOB_LOOKUP_RETRY_TIMES:
print(
f"Job {job_name} not found on attempt {attempt}/{JOB_LOOKUP_RETRY_TIMES}, "
f"retry in {JOB_LOOKUP_RETRY_INTERVAL_S}s"
)
time.sleep(JOB_LOOKUP_RETRY_INTERVAL_S)
return None

def get_task_status(self, job_id: str) -> Optional[JobStatus]:
try:
status = self.cluster.get_job_info(job_id).status
Expand Down
2 changes: 2 additions & 0 deletions autotest/config-npu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ case:
npu-qwen3-sft-ep8:
-
type: sft
phase: first
parameters:
config: autotest/config/npu_qwen3_moe_30BA3_ep8.py
output_path: /mnt/hwfile/llmrazor/qa-llm-cicd/test_output
Expand All @@ -80,6 +81,7 @@ case:
timeout: 10800
-
type: sft
phase: resume
pre_action:
command: 'python ./autotest/utils/update_meta.py /mnt/hwfile/llmrazor/qa-llm-cicd/test_output npu-qwen3-sft-ep8 sft'
parameters:
Expand Down
16 changes: 12 additions & 4 deletions autotest/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ case:
qwen3-sft-ep8:
-
type: sft
phase: first
parameters:
config: autotest/config/qwen3_moe_30BA3_ep8.py
output_path: /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output
Expand All @@ -79,6 +80,7 @@ case:
timeout: 1500
-
type: sft
phase: resume
pre_action:
command: 'python ./autotest/utils/update_meta.py /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output qwen3-sft-ep8 sft'
parameters:
Expand Down Expand Up @@ -352,6 +354,7 @@ case:
- DATA_PATH=/mnt/shared-storage-user/llmrazor-share/data/ci_vl
- MEDIA_ROOT=/mnt/shared-storage-user/llmrazor-share/data/ci_vl
- XTUNER_DETERMINISTIC=true
- TORCH_ALLOW_TF32_CUBLAS_OVERRIDE=0
assert_info:
base_metric: qwen3-sft-vl-dense/tracker.jsonl
check_metrics:
Expand Down Expand Up @@ -474,6 +477,7 @@ case:
qwen3-5-sft-sp4-resume:
-
type: sft
phase: first
parameters:
config: autotest/config/qwen3_5_moe_30BA3_sp4.py
output_path: /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output
Expand All @@ -498,6 +502,7 @@ case:

-
type: sft
phase: resume
pre_action:
command: 'python ./autotest/utils/update_meta.py /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output qwen3-5-sft-sp4-resume sft'
parameters:
Expand Down Expand Up @@ -607,6 +612,7 @@ case:
qwen3-5-sft-sp4-resume-vl:
-
type: sft
phase: first
parameters:
config: autotest/config/qwen3_5_moe_30BA3_sp4_vl.py
output_path: /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output
Expand All @@ -633,6 +639,7 @@ case:

-
type: sft
phase: resume
pre_action:
command: 'python ./autotest/utils/update_meta.py /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output qwen3-5-sft-sp4-resume-vl sft'
parameters:
Expand Down Expand Up @@ -745,7 +752,7 @@ case:
operator: <=
-
metric: response/response_len/mean
threshold: 0.12
threshold: 0.15
method: relative
operator: <
-
Expand Down Expand Up @@ -841,7 +848,7 @@ case:
threshold: 20
method: absolute
operator: <
timeout: 7200
timeout: 9000

qwen3-5-rl-vl-lmdeploy-dapo:
-
Expand Down Expand Up @@ -885,11 +892,12 @@ case:
threshold: 20
method: absolute
operator: <
timeout: 7200
timeout: 9000

qwen3-5-rl-vl-lmdeploy-resume:
-
type: rl
phase: first
parameters:
config: autotest/config/rl_qwen3p5_vl_35B_dapo_ep2_resume.py
infer_backend: lmdeploy
Expand Down Expand Up @@ -934,6 +942,7 @@ case:

-
type: rl
phase: resume
pre_action:
command: 'python ./autotest/utils/update_meta.py /mnt/shared-storage-user/llmrazor-share/qa-llm-cicd/test_output qwen3-5-rl-vl-lmdeploy-resume rl'
parameters:
Expand Down Expand Up @@ -1041,7 +1050,6 @@ case:
- XTUNER_USE_LMDEPLOY=0
- XTUNER_USE_VLLM=0
- XTUNER_USE_SGLANG=1
- TORCH_ALLOW_TF32_CUBLAS_OVERRIDE=0
- XTUNER_USE_FA3=0
assert_info:
base_metric: qwen3-rl-sglang/tracker.jsonl
Expand Down
Loading
Loading