Skip to content

Commit 37f253e

Browse files
apapirovskiMylesBorins
authored andcommitted
timers: refactor setImmediate error handling
If an error is encountered during the processing of Immediates, schedule the remaining queue to finish after all error handling code runs (if the process is still alive to do so). The new changes make the Immediates error handling behaviour entirely deterministic and predictable, as the full queue will be flushed on each Immediates cycle, regardless of whether an error is encountered or not. Currently this processing is scheduled for nextTick which can yield unpredictable results as the nextTick might happen as early as close callbacks phase or as late as after the next event loop turns Immediates all fully processed. The latter can result in two full cycles of Immediates processing during one even loop turn. The current implementation also doesn't differentiate between Immediates scheduled for the current queue run or the next one, so Immediates that were scheduled for the next turn of the event loop, will process alongside the ones that were scheduled for the current turn. Backport-PR-URL: #19006 PR-URL: #17879 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 07c6fb9 commit 37f253e

File tree

6 files changed

+160
-51
lines changed

6 files changed

+160
-51
lines changed

lib/timers.js

+36-29
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ const { kInit, kDestroy, kAsyncIdCounter } = async_wrap.constants;
5050
const async_id_symbol = Symbol('asyncId');
5151
const trigger_async_id_symbol = Symbol('triggerAsyncId');
5252

53-
const [activateImmediateCheck, scheduledImmediateCountArray] =
53+
// *Must* match Environment::ImmediateInfo::Fields in src/env.h.
54+
const kCount = 0;
55+
const kHasOutstanding = 1;
56+
57+
const [activateImmediateCheck, immediateInfo] =
5458
setImmediateCallback(processImmediate);
5559

5660
// Timeout values > TIMEOUT_MAX are set to 1.
@@ -674,16 +678,23 @@ ImmediateList.prototype.remove = function(item) {
674678
};
675679

676680
// Create a single linked list instance only once at startup
677-
var immediateQueue = new ImmediateList();
681+
const immediateQueue = new ImmediateList();
682+
683+
// If an uncaught exception was thrown during execution of immediateQueue,
684+
// this queue will store all remaining Immediates that need to run upon
685+
// resolution of all error handling (if process is still alive).
686+
const outstandingQueue = new ImmediateList();
678687

679688

680689
function processImmediate() {
681-
var immediate = immediateQueue.head;
682-
var tail = immediateQueue.tail;
690+
const queue = outstandingQueue.head !== null ?
691+
outstandingQueue : immediateQueue;
692+
var immediate = queue.head;
693+
var tail = queue.tail;
683694

684695
// Clear the linked list early in case new `setImmediate()` calls occur while
685696
// immediate callbacks are executed
686-
immediateQueue.head = immediateQueue.tail = null;
697+
queue.head = queue.tail = null;
687698

688699
while (immediate !== null) {
689700
if (!immediate._onImmediate) {
@@ -692,9 +703,14 @@ function processImmediate() {
692703
}
693704

694705
// Save next in case `clearImmediate(immediate)` is called from callback
695-
var next = immediate._idleNext;
706+
const next = immediate._idleNext;
707+
708+
const asyncId = immediate[async_id_symbol];
709+
emitBefore(asyncId, immediate[trigger_async_id_symbol]);
696710

697-
tryOnImmediate(immediate, tail);
711+
tryOnImmediate(immediate, next, tail);
712+
713+
emitAfter(asyncId);
698714

699715
// If `clearImmediate(immediate)` wasn't called from the callback, use the
700716
// `immediate`'s next item
@@ -703,45 +719,36 @@ function processImmediate() {
703719
else
704720
immediate = next;
705721
}
722+
723+
immediateInfo[kHasOutstanding] = 0;
706724
}
707725

708726
// An optimization so that the try/finally only de-optimizes (since at least v8
709727
// 4.7) what is in this smaller function.
710-
function tryOnImmediate(immediate, oldTail) {
728+
function tryOnImmediate(immediate, next, oldTail) {
711729
var threw = true;
712-
emitBefore(immediate[async_id_symbol], immediate[trigger_async_id_symbol]);
713730
try {
714731
// make the actual call outside the try/finally to allow it to be optimized
715732
runCallback(immediate);
716733
threw = false;
717734
} finally {
718735
immediate._onImmediate = null;
719-
if (!threw)
720-
emitAfter(immediate[async_id_symbol]);
721736

722737
if (!immediate._destroyed) {
723738
immediate._destroyed = true;
724-
scheduledImmediateCountArray[0]--;
739+
immediateInfo[kCount]--;
725740

726741
if (async_hook_fields[kDestroy] > 0) {
727742
emitDestroy(immediate[async_id_symbol]);
728743
}
729744
}
730745

731-
if (threw && immediate._idleNext !== null) {
732-
// Handle any remaining on next tick, assuming we're still alive to do so.
733-
const curHead = immediateQueue.head;
734-
const next = immediate._idleNext;
735-
if (curHead !== null) {
736-
curHead._idlePrev = oldTail;
737-
oldTail._idleNext = curHead;
738-
next._idlePrev = null;
739-
immediateQueue.head = next;
740-
} else {
741-
immediateQueue.head = next;
742-
immediateQueue.tail = oldTail;
743-
}
744-
process.nextTick(processImmediate);
746+
if (threw && (immediate._idleNext !== null || next !== null)) {
747+
// Handle any remaining Immediates after error handling has resolved,
748+
// assuming we're still alive to do so.
749+
outstandingQueue.head = immediate._idleNext || next;
750+
outstandingQueue.tail = oldTail;
751+
immediateInfo[kHasOutstanding] = 1;
745752
}
746753
}
747754
}
@@ -775,9 +782,9 @@ function Immediate(callback, args) {
775782
this);
776783
}
777784

778-
if (scheduledImmediateCountArray[0] === 0)
785+
if (immediateInfo[kCount] === 0)
779786
activateImmediateCheck();
780-
scheduledImmediateCountArray[0]++;
787+
immediateInfo[kCount]++;
781788

782789
immediateQueue.append(this);
783790
}
@@ -823,7 +830,7 @@ exports.clearImmediate = function(immediate) {
823830
if (!immediate) return;
824831

825832
if (!immediate._destroyed) {
826-
scheduledImmediateCountArray[0]--;
833+
immediateInfo[kCount]--;
827834
immediate._destroyed = true;
828835

829836
if (async_hook_fields[kDestroy] > 0) {

src/env-inl.h

+31-8
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,30 @@ inline bool Environment::AsyncCallbackScope::in_makecallback() const {
217217
return env_->makecallback_cntr_ > 1;
218218
}
219219

220+
inline Environment::ImmediateInfo::ImmediateInfo(v8::Isolate* isolate)
221+
: fields_(isolate, kFieldsCount) {}
222+
223+
inline AliasedBuffer<uint32_t, v8::Uint32Array>&
224+
Environment::ImmediateInfo::fields() {
225+
return fields_;
226+
}
227+
228+
inline uint32_t Environment::ImmediateInfo::count() const {
229+
return fields_[kCount];
230+
}
231+
232+
inline bool Environment::ImmediateInfo::has_outstanding() const {
233+
return fields_[kHasOutstanding] == 1;
234+
}
235+
236+
inline void Environment::ImmediateInfo::count_inc(uint32_t increment) {
237+
fields_[kCount] = fields_[kCount] + increment;
238+
}
239+
240+
inline void Environment::ImmediateInfo::count_dec(uint32_t decrement) {
241+
fields_[kCount] = fields_[kCount] - decrement;
242+
}
243+
220244
inline Environment::TickInfo::TickInfo(v8::Isolate* isolate)
221245
: fields_(isolate, kFieldsCount) {}
222246

@@ -263,14 +287,14 @@ inline Environment::Environment(IsolateData* isolate_data,
263287
v8::Local<v8::Context> context)
264288
: isolate_(context->GetIsolate()),
265289
isolate_data_(isolate_data),
290+
immediate_info_(context->GetIsolate()),
266291
tick_info_(context->GetIsolate()),
267292
timer_base_(uv_now(isolate_data->event_loop())),
268293
printed_error_(false),
269294
trace_sync_io_(false),
270295
abort_on_uncaught_exception_(false),
271296
emit_napi_warning_(true),
272297
makecallback_cntr_(0),
273-
scheduled_immediate_count_(isolate_, 1),
274298
should_abort_on_uncaught_toggle_(isolate_, 1),
275299
#if HAVE_INSPECTOR
276300
inspector_agent_(new inspector::Agent(this)),
@@ -357,6 +381,10 @@ inline Environment::AsyncHooks* Environment::async_hooks() {
357381
return &async_hooks_;
358382
}
359383

384+
inline Environment::ImmediateInfo* Environment::immediate_info() {
385+
return &immediate_info_;
386+
}
387+
360388
inline Environment::TickInfo* Environment::tick_info() {
361389
return &tick_info_;
362390
}
@@ -486,11 +514,6 @@ inline void Environment::set_fs_stats_field_array(double* fields) {
486514
fs_stats_field_array_ = fields;
487515
}
488516

489-
inline AliasedBuffer<uint32_t, v8::Uint32Array>&
490-
Environment::scheduled_immediate_count() {
491-
return scheduled_immediate_count_;
492-
}
493-
494517
void Environment::SetImmediate(native_immediate_callback cb,
495518
void* data,
496519
v8::Local<v8::Object> obj) {
@@ -500,9 +523,9 @@ void Environment::SetImmediate(native_immediate_callback cb,
500523
std::unique_ptr<v8::Persistent<v8::Object>>(
501524
obj.IsEmpty() ? nullptr : new v8::Persistent<v8::Object>(isolate_, obj))
502525
});
503-
if (scheduled_immediate_count_[0] == 0)
526+
if (immediate_info()->count() == 0)
504527
ActivateImmediateCheck();
505-
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] + 1;
528+
immediate_info()->count_inc(1);
506529
}
507530

508531
inline performance::performance_state* Environment::performance_state() {

src/env.cc

+11-9
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,14 @@ void Environment::RunAndClearNativeImmediates() {
281281
}
282282

283283
#ifdef DEBUG
284-
CHECK_GE(scheduled_immediate_count_[0], count);
284+
CHECK_GE(immediate_info()->count(), count);
285285
#endif
286-
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] - count;
286+
immediate_info()->count_dec(count);
287287
}
288288
}
289289

290290
static bool MaybeStopImmediate(Environment* env) {
291-
if (env->scheduled_immediate_count()[0] == 0) {
291+
if (env->immediate_info()->count() == 0) {
292292
uv_check_stop(env->immediate_check_handle());
293293
uv_idle_stop(env->immediate_idle_handle());
294294
return true;
@@ -307,12 +307,14 @@ void Environment::CheckImmediate(uv_check_t* handle) {
307307

308308
env->RunAndClearNativeImmediates();
309309

310-
MakeCallback(env->isolate(),
311-
env->process_object(),
312-
env->immediate_callback_function(),
313-
0,
314-
nullptr,
315-
{0, 0}).ToLocalChecked();
310+
do {
311+
MakeCallback(env->isolate(),
312+
env->process_object(),
313+
env->immediate_callback_function(),
314+
0,
315+
nullptr,
316+
{0, 0}).ToLocalChecked();
317+
} while (env->immediate_info()->has_outstanding());
316318

317319
MaybeStopImmediate(env);
318320
}

src/env.h

+26-3
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,30 @@ class Environment {
451451
DISALLOW_COPY_AND_ASSIGN(AsyncCallbackScope);
452452
};
453453

454+
class ImmediateInfo {
455+
public:
456+
inline AliasedBuffer<uint32_t, v8::Uint32Array>& fields();
457+
inline uint32_t count() const;
458+
inline bool has_outstanding() const;
459+
460+
inline void count_inc(uint32_t increment);
461+
inline void count_dec(uint32_t decrement);
462+
463+
private:
464+
friend class Environment; // So we can call the constructor.
465+
inline explicit ImmediateInfo(v8::Isolate* isolate);
466+
467+
enum Fields {
468+
kCount,
469+
kHasOutstanding,
470+
kFieldsCount
471+
};
472+
473+
AliasedBuffer<uint32_t, v8::Uint32Array> fields_;
474+
475+
DISALLOW_COPY_AND_ASSIGN(ImmediateInfo);
476+
};
477+
454478
class TickInfo {
455479
public:
456480
inline AliasedBuffer<uint8_t, v8::Uint8Array>& fields();
@@ -530,6 +554,7 @@ class Environment {
530554
inline void FinishHandleCleanup(uv_handle_t* handle);
531555

532556
inline AsyncHooks* async_hooks();
557+
inline ImmediateInfo* immediate_info();
533558
inline TickInfo* tick_info();
534559
inline uint64_t timer_base() const;
535560

@@ -577,8 +602,6 @@ class Environment {
577602
inline double* fs_stats_field_array() const;
578603
inline void set_fs_stats_field_array(double* fields);
579604

580-
inline AliasedBuffer<uint32_t, v8::Uint32Array>& scheduled_immediate_count();
581-
582605
inline performance::performance_state* performance_state();
583606
inline std::map<std::string, uint64_t>* performance_marks();
584607

@@ -686,6 +709,7 @@ class Environment {
686709
uv_check_t idle_check_handle_;
687710

688711
AsyncHooks async_hooks_;
712+
ImmediateInfo immediate_info_;
689713
TickInfo tick_info_;
690714
const uint64_t timer_base_;
691715
bool printed_error_;
@@ -695,7 +719,6 @@ class Environment {
695719
size_t makecallback_cntr_;
696720
std::vector<double> destroy_async_id_list_;
697721

698-
AliasedBuffer<uint32_t, v8::Uint32Array> scheduled_immediate_count_;
699722
AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_;
700723

701724
int should_not_abort_scope_counter_ = 0;

src/timer_wrap.cc

+3-2
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,9 @@ class TimerWrap : public HandleWrap {
9090
env->NewFunctionTemplate(activate_cb)->GetFunction(env->context())
9191
.ToLocalChecked();
9292
auto result = Array::New(env->isolate(), 2);
93-
result->Set(0, activate_function);
94-
result->Set(1, env->scheduled_immediate_count().GetJSArray());
93+
result->Set(env->context(), 0, activate_function).FromJust();
94+
result->Set(env->context(), 1,
95+
env->immediate_info()->fields().GetJSArray()).FromJust();
9596
args.GetReturnValue().Set(result);
9697
}
9798

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const domain = require('domain');
6+
7+
// setImmediate should run clear its queued cbs once per event loop turn
8+
// but immediates queued while processing the current queue should happen
9+
// on the next turn of the event loop.
10+
11+
// In addition, if any setImmediate throws, the rest of the queue should
12+
// be processed after all error handling is resolved, but that queue
13+
// should not include any setImmediate calls scheduled after the
14+
// processing of the queue started.
15+
16+
let threw = false;
17+
let stage = -1;
18+
19+
const QUEUE = 10;
20+
21+
const errObj = {
22+
type: Error,
23+
message: 'setImmediate Err'
24+
};
25+
26+
process.once('uncaughtException', common.expectsError(errObj));
27+
process.once('uncaughtException', () => assert.strictEqual(stage, 0));
28+
29+
const d1 = domain.create();
30+
d1.once('error', common.expectsError(errObj));
31+
d1.once('error', () => assert.strictEqual(stage, 0));
32+
33+
const run = common.mustCall((callStage) => {
34+
assert(callStage >= stage);
35+
stage = callStage;
36+
if (threw)
37+
return;
38+
39+
setImmediate(run, 2);
40+
}, QUEUE * 3);
41+
42+
for (let i = 0; i < QUEUE; i++)
43+
setImmediate(run, 0);
44+
setImmediate(() => {
45+
threw = true;
46+
process.nextTick(() => assert.strictEqual(stage, 1));
47+
throw new Error('setImmediate Err');
48+
});
49+
d1.run(() => setImmediate(() => {
50+
throw new Error('setImmediate Err');
51+
}));
52+
for (let i = 0; i < QUEUE; i++)
53+
setImmediate(run, 1);

0 commit comments

Comments
 (0)