Skip to content

bpo-27144: concurrent.futures as_complete and map iterators do not keep reference to returned object #1560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ def _create_and_install_waiters(fs, return_when):

return waiter


def _yield_and_decref(fs, ref_collect):
"""
Iterate on the list *fs*, yielding objects one by one in reverse order.
Before yielding an object, it is removed from each set in
the collection of sets *ref_collect*.
"""
while fs:
for futures_set in ref_collect:
futures_set.remove(fs[-1])
# Careful not to keep a reference to the popped value
yield fs.pop()


def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.

Expand All @@ -191,16 +205,18 @@ def as_completed(fs, timeout=None):
if timeout is not None:
end_time = timeout + time.time()

total_futures = len(fs)

fs = set(fs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a regression. The ordering of instructions here is wrong. Now fs must be a sequence to support the len(fs).

with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

finished = list(finished)
try:
yield from finished
yield from _yield_and_decref(finished, ref_collect=(fs,))

while pending:
if timeout is None:
Expand All @@ -210,7 +226,7 @@ def as_completed(fs, timeout=None):
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
len(pending), total_futures))

waiter.event.wait(wait_timeout)

Expand All @@ -219,9 +235,9 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = []
waiter.event.clear()

for future in finished:
yield future
pending.remove(future)
# reverse to keep finishing order
finished.reverse()
yield from _yield_and_decref(finished, ref_collect=(fs, pending))

finally:
for f in fs:
Expand Down Expand Up @@ -551,11 +567,14 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
# before the first iterator value is required.
def result_iterator():
try:
for future in fs:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield future.result()
yield fs.pop().result()
else:
yield future.result(end_time - time.time())
yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
Expand Down
14 changes: 13 additions & 1 deletion Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,18 @@ def _check_system_limits():
raise NotImplementedError(_system_limited)


def _chain_from_iterable_of_lists(iterable):
"""
Specialized implementation of itertools.chain.from_iterable.
Each item in *iterable* should be a list. This function is
careful not to keep references to yielded objects.
"""
for element in iterable:
element.reverse()
while element:
yield element.pop()


class BrokenProcessPool(RuntimeError):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
Expand Down Expand Up @@ -482,7 +494,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
results = super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)
return itertools.chain.from_iterable(results)
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True):
with self._shutdown_lock:
Expand Down
48 changes: 48 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def my_method(self):
pass


def make_dummy_object(_):
return MyObject()


class BaseTestCase(unittest.TestCase):
def setUp(self):
self._thread_key = test.support.threading_setup()
Expand Down Expand Up @@ -396,6 +400,38 @@ def test_duplicate_futures(self):
completed = [f for f in futures.as_completed([future1,future1])]
self.assertEqual(len(completed), 1)

def test_free_reference_yielded_future(self):
# Issue #14406: Generator should not keep references
# to finished futures.
futures_list = [Future() for _ in range(8)]
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
futures_list.append(create_future(state=SUCCESSFUL_FUTURE))

with self.assertRaises(futures.TimeoutError):
for future in futures.as_completed(futures_list, timeout=0):
futures_list.remove(future)
wr = weakref.ref(future)
del future
self.assertIsNone(wr())

futures_list[0].set_result("test")
for future in futures.as_completed(futures_list):
futures_list.remove(future)
wr = weakref.ref(future)
del future
self.assertIsNone(wr())
if futures_list:
futures_list[0].set_result("test")

def test_correct_timeout_exception_msg(self):
futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
RUNNING_FUTURE, SUCCESSFUL_FUTURE]

with self.assertRaises(futures.TimeoutError) as cm:
list(futures.as_completed(futures_list, timeout=0))

self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')


class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase):
pass
Expand All @@ -421,6 +457,10 @@ def test_map(self):
list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10))))

self.assertEqual(
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
list(map(pow, range(10), range(10))))

def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
Expand Down Expand Up @@ -471,6 +511,14 @@ def test_max_workers_negative(self):
"than 0"):
self.executor_type(max_workers=number)

def test_free_reference(self):
# Issue #14406: Result iterator should not keep an internal
# reference to result objects.
for obj in self.executor.map(make_dummy_object, range(10)):
wr = weakref.ref(obj)
del obj
self.assertIsNone(wr())


class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
def test_map_submits_without_iteration(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The ``map()`` and ``as_completed()`` iterators in ``concurrent.futures``
now avoid keeping a reference to yielded objects.