Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Executor Objects
Signal the executor that it should free any resources that it is using
when the currently pending futures are done executing. Calls to
:meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
raise :exc:`RuntimeError`.
raise :exc:`ExecutorShutdownError`.

If *wait* is ``True`` then this method will not return until all the
pending futures are done executing and the resources associated with the
Expand Down Expand Up @@ -705,6 +705,14 @@ Exception classes

.. versionadded:: 3.8

.. exception:: ExecutorShutdownError

Derived from :exc:`RuntimeError`, this exception class is raised when work
is submitted to an executor that cannot accept new futures because it has
been shut down.

.. versionadded:: 3.16

.. currentmodule:: concurrent.futures.thread

.. exception:: BrokenThreadPool
Expand Down
2 changes: 2 additions & 0 deletions Lib/concurrent/futures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CancelledError,
TimeoutError,
InvalidStateError,
ExecutorShutdownError,
BrokenExecutor,
Future,
Executor,
Expand All @@ -27,6 +28,7 @@
'CancelledError',
'TimeoutError',
'InvalidStateError',
'ExecutorShutdownError',
'BrokenExecutor',
'Future',
'Executor',
Expand Down
4 changes: 4 additions & 0 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class InvalidStateError(Error):
"""The operation is not allowed in this state."""
pass

class ExecutorShutdownError(RuntimeError):
"""Raised when submitting work to an executor that cannot accept new futures."""
pass

class _Waiter(object):
"""Provides the event that wait() and as_completed() block on."""
def __init__(self):
Expand Down
7 changes: 4 additions & 3 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,11 @@ def submit(self, fn, /, *args, **kwargs):
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
raise _base.ExecutorShutdownError(
'cannot schedule new futures after shutdown')
if _global_shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
raise _base.ExecutorShutdownError(
'cannot schedule new futures after interpreter shutdown')

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
Expand Down
7 changes: 4 additions & 3 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ def submit(self, fn, /, *args, **kwargs):
raise self.BROKEN(self._broken)

if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
raise _base.ExecutorShutdownError(
'cannot schedule new futures after shutdown')
if _shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
raise _base.ExecutorShutdownError(
'cannot schedule new futures after interpreter shutdown')

f = _base.Future()
task = self._resolve_work_item_task(fn, args, kwargs)
Expand Down
10 changes: 10 additions & 0 deletions Lib/test/test_concurrent_futures/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import unittest
import sys
import io
from concurrent import futures
from concurrent.futures._base import BrokenExecutor
from concurrent.futures.process import _check_system_limits

Expand Down Expand Up @@ -39,6 +40,15 @@ def init_fail(log_queue=None):
raise ValueError('error in initializer')


class PublicAPITest(unittest.TestCase):
def test_executor_shutdown_error(self):
from concurrent.futures import ExecutorShutdownError

self.assertIs(ExecutorShutdownError, futures.ExecutorShutdownError)
self.assertIn("ExecutorShutdownError", futures.__all__)
self.assertTrue(issubclass(ExecutorShutdownError, RuntimeError))


class InitializerMixin(ExecutorMixin):
worker_count = 2

Expand Down
3 changes: 2 additions & 1 deletion Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ def test_force_shutdown_workers_stops_pool(self, function_name):
worker_process = list(executor._processes.values())[0]
getattr(executor, function_name)()

self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
self.assertRaises(futures.ExecutorShutdownError,
executor.submit, time.sleep, 0)

# A signal sent, is not a signal reacted to.
# So wait a moment here for the process to die.
Expand Down
19 changes: 11 additions & 8 deletions Lib/test/test_concurrent_futures/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ def sleep_and_print(t, msg):
class ExecutorShutdownTest:
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
self.executor.submit,
pow, 2, 5)
with self.assertRaises(futures.ExecutorShutdownError) as cm:
self.executor.submit(pow, 2, 5)
self.assertIsInstance(cm.exception, RuntimeError)
self.assertEqual(str(cm.exception),
"cannot schedule new futures after shutdown")

def test_interpreter_shutdown(self):
# Test the atexit hook for shutdown of worker threads and processes
Expand Down Expand Up @@ -59,10 +61,10 @@ def test_submit_after_interpreter_shutdown(self):
def run_last():
try:
t.submit(id, None)
except RuntimeError:
print("runtime-error")
except ExecutorShutdownError:
print("executor-shutdown-error")
raise
from concurrent.futures import {executor_type}
from concurrent.futures import {executor_type}, ExecutorShutdownError
if __name__ == "__main__":
context = '{context}'
if not context:
Expand All @@ -76,8 +78,9 @@ def run_last():
context=getattr(self, "ctx", "")))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
self.assertEqual(out.strip(), b"runtime-error")
self.assertIn("ExecutorShutdownError: cannot schedule new futures",
err.decode())
self.assertEqual(out.strip(), b"executor-shutdown-error")

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_hang_issue12364(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add ``concurrent.futures.ExecutorShutdownError``, a :exc:`RuntimeError`
subclass raised when submitting work to an executor that cannot accept new
futures because it has been shut down.
Loading