Skip to content

[Python] Optimize BigQuery copy jobs in file loads using multi-source copy#38983

Open
stankiewicz wants to merge 4 commits into
apache:masterfrom
stankiewicz:improve_copy_table
Open

[Python] Optimize BigQuery copy jobs in file loads using multi-source copy#38983
stankiewicz wants to merge 4 commits into
apache:masterfrom
stankiewicz:improve_copy_table

Optimize BigQuery copy jobs in file loads using multi-source copy

af3b026
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
GitHub Actions / Python 3.13 Test Results (ubuntu-latest) failed Jun 16, 2026 in 0s

17 errors, 12 skipped, 1 pass in 11m 38s

 2 files  + 2  2 suites  +2   11m 38s ⏱️ + 11m 38s
30 tests +30  1 ✅ +1  12 💤 +12  0 ❌ ±0  17 🔥 +17 
37 runs  +37  1 ✅ +1  19 💤 +19  0 ❌ ±0  17 🔥 +17 

Results for commit af3b026. ± Comparison against earlier commit 898dca4.

Annotations

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_write_on_missing_primary_key_in_entity (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 33s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f2762a14e90>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f2762a08550>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_write_on_non_existent_collection (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 0s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f2762a14e90>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f2762a08550>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_write_on_non_existent_partition (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 0s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f2762a14e90>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f2762a08550>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_write_on_auto_id_primary_key (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 0s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f2762a14e90>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f2762a08550>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_write_on_existent_collection_with_default_schema (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 0s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f2762a14e90>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f2762a08550>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_write_with_batching (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 0s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f2762a14e90>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f2762a08550>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_write_with_custom_column_specifications (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f2762a14e90>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f2762a08550>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_vector_search_with_inner_product_similarity (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 35s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f048194c050>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f0481803d90>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_query_on_non_existent_collection (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 33s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f64a696b360>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f64a685c2d0>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_empty_input_chunks (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 33s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7fc3aaf5b5c0>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7fc3aaeb8050>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_invalid_query_on_non_existent_field (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f64a696b360>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f64a685c2d0>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_filtered_search_with_bm25_full_text_and_batching (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7fc3aaf5b5c0>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7fc3aaeb8050>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_idempotent_write (apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 6s]
Raw output
failed on setup with "TimeoutError: container did not become running"
cls = <class 'apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/ingestion/milvus_search_it_test.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:217: in start_db_container
    raise e
apache_beam/ml/rag/test_utils.py:189: in start_db_container
    port = running_container.get_exposed_port(service_container_port)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/container.py:331: in get_exposed_port
    C().wait_until_ready(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.wait_strategies.ContainerStatusWaitStrategy object at 0x7f04819e3df0>
container = <apache_beam.ml.rag.test_utils.CustomMilvusContainer object at 0x7f04819a8cb0>

    def wait_until_ready(self, container: WaitStrategyTarget) -> None:
        result = self._poll(lambda: self.running(self.get_status(container)))
        if not result:
>           raise TimeoutError("container did not become running")
E           TimeoutError: container did not become running

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/testcontainers/core/wait_strategies.py:672: TimeoutError

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_keyword_search_with_inner_product_sparse_embedding (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fa94482c7d0>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
>           grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <grpc._utilities._ChannelReadyFuture object at 0x7fa9448a1770>
timeout = 10

    def _block(self, timeout: Optional[float]) -> None:
        until = None if timeout is None else time.time() + timeout
        with self._condition:
            while True:
                if self._cancelled:
                    raise grpc.FutureCancelledError()
                if self._matured:
                    return
                if until is None:
                    self._condition.wait()
                else:
                    remaining = until - time.time()
                    if remaining < 0:
>                       raise grpc.FutureTimeoutError()
E                       grpc.FutureTimeoutError

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError

The above exception was the direct cause of the following exception:

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7fa946c3a660>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
>         result = operation()
                   ^^^^^^^^^^^

apache_beam/ml/rag/utils.py:200: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fa94482c7d0>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
            grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
            self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
        except grpc.FutureTimeoutError as e:
            self.close()
>           raise MilvusException(
                code=Status.CONNECT_FAILED,
                message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
            ) from e
E           pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:50535, illegal connection params or server unavailable)>

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException

During handling of the above exception, another exception occurred:

cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7fa946c3a660>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
          result = operation()
          _LOGGER.info(
              "Successfully completed %s on attempt %d",
              operation_name,
              attempt + 1)
          return result
        except exception_types as e:
          last_exception = e
          if attempt < max_retries:
            delay = retry_delay * (retry_backoff_factor**attempt)
            _LOGGER.warning(
                "%s attempt %d failed: %s. Retrying in %.2f seconds...",
                operation_name,
                attempt + 1,
                e,
                delay)
>           time.sleep(delay)
E           Failed: Timeout (>600.0s) from pytest-timeout.

apache_beam/ml/rag/utils.py:216: Failed

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_filtered_search_with_cosine_similarity_and_batching (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 10m 0s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f0418e90050>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
>           grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <grpc._utilities._ChannelReadyFuture object at 0x7f0418eb14f0>
timeout = 10

    def _block(self, timeout: Optional[float]) -> None:
        until = None if timeout is None else time.time() + timeout
        with self._condition:
            while True:
                if self._cancelled:
                    raise grpc.FutureCancelledError()
                if self._matured:
                    return
                if until is None:
                    self._condition.wait()
                else:
                    remaining = until - time.time()
                    if remaining < 0:
>                       raise grpc.FutureTimeoutError()
E                       grpc.FutureTimeoutError

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError

The above exception was the direct cause of the following exception:

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f0579862200>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
>         result = operation()
                   ^^^^^^^^^^^

apache_beam/ml/rag/utils.py:200: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f0418e90050>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
            grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
            self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
        except grpc.FutureTimeoutError as e:
            self.close()
>           raise MilvusException(
                code=Status.CONNECT_FAILED,
                message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
            ) from e
E           pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:34955, illegal connection params or server unavailable)>

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException

During handling of the above exception, another exception occurred:

cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f0579862200>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
          result = operation()
          _LOGGER.info(
              "Successfully completed %s on attempt %d",
              operation_name,
              attempt + 1)
          return result
        except exception_types as e:
          last_exception = e
          if attempt < max_retries:
            delay = retry_delay * (retry_backoff_factor**attempt)
            _LOGGER.warning(
                "%s attempt %d failed: %s. Retrying in %.2f seconds...",
                operation_name,
                attempt + 1,
                e,
                delay)
>           time.sleep(delay)
E           Failed: Timeout (>600.0s) from pytest-timeout.

apache_beam/ml/rag/utils.py:216: Failed

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_vector_search_with_euclidean_distance (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fa94482c7d0>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
>           grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <grpc._utilities._ChannelReadyFuture object at 0x7fa9448a1770>
timeout = 10

    def _block(self, timeout: Optional[float]) -> None:
        until = None if timeout is None else time.time() + timeout
        with self._condition:
            while True:
                if self._cancelled:
                    raise grpc.FutureCancelledError()
                if self._matured:
                    return
                if until is None:
                    self._condition.wait()
                else:
                    remaining = until - time.time()
                    if remaining < 0:
>                       raise grpc.FutureTimeoutError()
E                       grpc.FutureTimeoutError

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError

The above exception was the direct cause of the following exception:

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7fa946c3a660>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
>         result = operation()
                   ^^^^^^^^^^^

apache_beam/ml/rag/utils.py:200: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fa94482c7d0>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
            grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
            self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
        except grpc.FutureTimeoutError as e:
            self.close()
>           raise MilvusException(
                code=Status.CONNECT_FAILED,
                message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
            ) from e
E           pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:50535, illegal connection params or server unavailable)>

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException

During handling of the above exception, another exception occurred:

cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7fa946c3a660>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
          result = operation()
          _LOGGER.info(
              "Successfully completed %s on attempt %d",
              operation_name,
              attempt + 1)
          return result
        except exception_types as e:
          last_exception = e
          if attempt < max_retries:
            delay = retry_delay * (retry_backoff_factor**attempt)
            _LOGGER.warning(
                "%s attempt %d failed: %s. Retrying in %.2f seconds...",
                operation_name,
                attempt + 1,
                e,
                delay)
>           time.sleep(delay)
E           Failed: Timeout (>600.0s) from pytest-timeout.

apache_beam/ml/rag/utils.py:216: Failed

Check failure on line 0 in apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

test_hybrid_search (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) with error

sdks/python/test-suites/tox/py313/build/srcs/sdks/python/pytest_py313-ml.xml [took 1s]
Raw output
failed on setup with "Failed: Timeout (>600.0s) from pytest-timeout."
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f0418e90050>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
>           grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:160: in result
    self._block(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <grpc._utilities._ChannelReadyFuture object at 0x7f0418eb14f0>
timeout = 10

    def _block(self, timeout: Optional[float]) -> None:
        until = None if timeout is None else time.time() + timeout
        with self._condition:
            while True:
                if self._cancelled:
                    raise grpc.FutureCancelledError()
                if self._matured:
                    return
                if until is None:
                    self._condition.wait()
                else:
                    remaining = until - time.time()
                    if remaining < 0:
>                       raise grpc.FutureTimeoutError()
E                       grpc.FutureTimeoutError

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/grpc/_utilities.py:105: FutureTimeoutError

The above exception was the direct cause of the following exception:

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f0579862200>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
>         result = operation()
                   ^^^^^^^^^^^

apache_beam/ml/rag/utils.py:200: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:147: in list_collections_probe
    client = MilvusClient(uri=uri)
             ^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:89: in __init__
    self._handler = self._manager.get_or_create(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:500: in get_or_create
    return self._create_shared(config, client, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/connection_manager.py:524: in _create_shared
    handler._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f0418e90050>
timeout = None

    def _wait_for_channel_ready(self, timeout: Optional[float] = 10):
        if self._channel is None:
            raise MilvusException(
                code=Status.CONNECT_FAILED,
                message="No channel in handler, please setup grpc channel first",
            )
    
        # grpc.Future.result(timeout=None) blocks indefinitely.  Normalise None
        # to the default 10 s so that an unreachable URI raises MilvusException
        # instead of hanging forever (mirrors async ensure_channel_ready behaviour).
        effective_timeout = timeout if timeout is not None else 10
    
        try:
            grpc.channel_ready_future(self._channel).result(timeout=effective_timeout)
            self._setup_identifier_interceptor(self._user, timeout=effective_timeout)
        except grpc.FutureTimeoutError as e:
            self.close()
>           raise MilvusException(
                code=Status.CONNECT_FAILED,
                message=f"Fail connecting to server on {self._address}, illegal connection params or server unavailable",
            ) from e
E           pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on localhost:34955, illegal connection params or server unavailable)>

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:235: MilvusException

During handling of the above exception, another exception occurred:

cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusTestHelpers.start_db_container()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/test_utils.py:191: in start_db_container
    MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
apache_beam/ml/rag/test_utils.py:153: in _wait_for_milvus_grpc
    retry_with_backoff(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

operation = <function MilvusTestHelpers._wait_for_milvus_grpc.<locals>.list_collections_probe at 0x7f0579862200>
max_retries = 25, retry_delay = 2.0, retry_backoff_factor = 1.2
operation_name = 'Milvus client connection after container start'
exception_types = (<class 'pymilvus.exceptions.MilvusException'>,)

    def retry_with_backoff(
        operation: Callable[[], Any],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff_factor: float = 2.0,
        operation_name: str = "operation",
        exception_types: tuple[type[BaseException], ...] = (Exception, )
    ) -> Any:
      """Executes an operation with retry logic and exponential backoff.
    
      This is a generic retry utility that can be used for any operation that may
      fail transiently. It retries the operation with exponential backoff between
      attempts.
    
      Note:
        This utility is designed for one-time setup operations and complements
        Apache Beam's RequestResponseIO pattern. Use retry_with_backoff() for:
    
        * Establishing client connections in __enter__() methods (e.g., creating
          MilvusClient instances, database connections) before processing elements
        * One-time setup/teardown operations in DoFn lifecycle methods
        * Operations outside of per-element processing where retry is needed
    
        For per-element operations (e.g., API calls within Caller.__call__),
        use RequestResponseIO which already provides automatic retry with
        exponential backoff, failure handling, caching, and other features.
        See: https://beam.apache.org/documentation/io/built-in/webapis/
    
      Args:
        operation: Callable that performs the operation to retry. Should return
          the result of the operation.
        max_retries: Maximum number of retry attempts. Default is 3.
        retry_delay: Initial delay in seconds between retries. Default is 1.0.
        retry_backoff_factor: Multiplier for the delay after each retry. Default
          is 2.0 (exponential backoff).
        operation_name: Name of the operation for logging purposes. Default is
          "operation".
        exception_types: Tuple of exception types to catch and retry. Default is
          (Exception,) which catches all exceptions.
    
      Returns:
        The result of the operation if successful.
    
      Raises:
        The last exception encountered if all retry attempts fail.
    
      Example:
        >>> def connect_to_service():
        ...     return service.connect(host="localhost")
        >>> client = retry_with_backoff(
        ...     connect_to_service,
        ...     max_retries=5,
        ...     retry_delay=2.0,
        ...     operation_name="service connection")
      """
      last_exception = None
      for attempt in range(max_retries + 1):
        try:
          result = operation()
          _LOGGER.info(
              "Successfully completed %s on attempt %d",
              operation_name,
              attempt + 1)
          return result
        except exception_types as e:
          last_exception = e
          if attempt < max_retries:
            delay = retry_delay * (retry_backoff_factor**attempt)
            _LOGGER.warning(
                "%s attempt %d failed: %s. Retrying in %.2f seconds...",
                operation_name,
                attempt + 1,
                e,
                delay)
>           time.sleep(delay)
E           Failed: Timeout (>600.0s) from pytest-timeout.

apache_beam/ml/rag/utils.py:216: Failed

Check notice on line 0 in .github

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

12 skipped tests found

There are 12 skipped tests, see "Raw output" for the full list of skipped tests.
Raw output
apache_beam.ml.inference.anthropic_inference_it_test
apache_beam.ml.inference.anthropic_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_both_dense_and_sparse
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_dense_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_sparse_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_byte_size_limit
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test

Check notice on line 0 in .github

See this annotation in the file changed.

@github-actions github-actions / Python 3.13 Test Results (ubuntu-latest)

30 tests found

There are 30 tests, see "Raw output" for the full list of tests.
Raw output
apache_beam.ml.inference.anthropic_inference_it_test
apache_beam.ml.inference.anthropic_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_empty_input_chunks
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_filtered_search_with_bm25_full_text_and_batching
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_filtered_search_with_cosine_similarity_and_batching
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_hybrid_search
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_invalid_query_on_non_existent_collection
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_invalid_query_on_non_existent_field
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_keyword_search_with_inner_product_sparse_embedding
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_vector_search_with_euclidean_distance
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_vector_search_with_inner_product_similarity
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_idempotent_write
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_missing_primary_key_in_entity
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_non_existent_collection
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_invalid_write_on_non_existent_partition
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_on_auto_id_primary_key
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_on_existent_collection_with_default_schema
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.milvus_search_it_test.TestMilvusVectorWriterConfig ‑ test_write_with_custom_column_specifications
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_both_dense_and_sparse
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_dense_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_on_non_existent_collection
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_sparse_embeddings_only
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_batching
apache_beam.ml.rag.ingestion.qdrant_it_test.TestQdrantIngestion ‑ test_write_with_byte_size_limit
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test