added async:remove to remove in-progress tasks, and async.wrap_iterator for turning iteration asynchronous

This commit is contained in:
Max Cahill 2023-08-07 17:13:50 +10:00
parent 97bf19a787
commit 758533445c

View File

@ -30,15 +30,15 @@ end
local capture_callstacks local capture_callstacks
if love and love.system.getOS() == 'Web' then if love and love.system.getOS() == 'Web' then
-- Do no extra wrapping under lovejs because using xpcall causes "attempt --do no extra wrapping under lovejs because using xpcall
-- to yield across metamethod/C-call boundary" -- causes a yield across a c call boundary
capture_callstacks = function(f) capture_callstacks = function(f)
return f return f
end end
else else
capture_callstacks = function(f) capture_callstacks = function(f)
-- Report errors with the coroutine's callstack instead of one coming --report errors with the coroutine's callstack instead of one coming
-- from async:update. -- from async:update
return function(...) return function(...)
local results = {xpcall(f, debug.traceback, ...)} local results = {xpcall(f, debug.traceback, ...)}
local success = table.remove(results, 1) local success = table.remove(results, 1)
@ -54,21 +54,35 @@ end
function async:call(f, args, callback, error_callback) function async:call(f, args, callback, error_callback)
assert:type_or_nil(args, "table", "async:call - args", 1) assert:type_or_nil(args, "table", "async:call - args", 1)
f = capture_callstacks(f) f = capture_callstacks(f)
self:add(coroutine.create(f), args, callback, error_callback) return self:add(coroutine.create(f), args, callback, error_callback)
end end
--add an already-existing coroutine to the kernel --add an already-existing coroutine to the kernel
function async:add(co, args, callback, error_callback) function async:add(co, args, callback, error_callback)
table.insert(self.tasks, { local task = {
co, co,
args or {}, args or {},
callback or false, callback or false,
error_callback or false, error_callback or false,
}) }
table.insert(self.tasks, task)
return task
end end
local function process_resume(self, td, success, msg, ...) --remove a running task based on the reference we got earlier
local co, args, cb, error_cb = unpack(td) function async:remove(task)
task.remove = true
-- can't remove the currently running one from lists
if coroutine.status(task[1]) ~= "running" then
return table.remove_value(self.tasks, task)
or table.remove_value(self.tasks_stalled, task)
end
end
--separate local for processing a resume;
-- because the results come as varargs this way
local function process_resume(self, task, success, msg, ...)
local co, args, cb, error_cb = unpack(task)
--error? --error?
if not success then if not success then
if error_cb then if error_cb then
@ -80,7 +94,7 @@ local function process_resume(self, td, success, msg, ...)
end end
end end
--check done --check done
if coroutine.status(co) == "dead" then if coroutine.status(co) == "dead" or task.remove then
--done? run callback with result --done? run callback with result
if cb then if cb then
cb(msg, ...) cb(msg, ...)
@ -89,9 +103,9 @@ local function process_resume(self, td, success, msg, ...)
--if not completed, re-add to the appropriate queue --if not completed, re-add to the appropriate queue
if msg == "stall" then if msg == "stall" then
--add to stalled queue as signalled stall --add to stalled queue as signalled stall
table.insert(self.tasks_stalled, td) table.insert(self.tasks_stalled, task)
else else
table.insert(self.tasks, td) table.insert(self.tasks, task)
end end
end end
end end
@ -99,8 +113,8 @@ end
--update some task in the kernel --update some task in the kernel
function async:update() function async:update()
--grab task definition --grab task definition
local td = table.remove(self.tasks, 1) local task = table.remove(self.tasks, 1)
if not td then if not task then
--have we got stalled tasks to re-try? --have we got stalled tasks to re-try?
if #self.tasks_stalled > 0 then if #self.tasks_stalled > 0 then
--swap queues rather than churning elements --swap queues rather than churning elements
@ -110,10 +124,11 @@ function async:update()
return false return false
end end
end end
--run a step --run a step
--(using unpack because coroutine is also nyi and it's core to this async model) --(using unpack because coroutine is also nyi and it's core to this async model)
local co, args = unpack(td) local co, args = unpack(task)
process_resume(self, td, coroutine.resume(co, unpack(args))) process_resume(self, task, coroutine.resume(co, unpack(args)))
return true return true
end end
@ -172,4 +187,23 @@ function async.wait(time)
end end
end end
--make an iterator or search function asynchronous, stalling every n (or 1) iterations
--can be useful with functional queries as well, if they are done in a coroutine.
function async.wrap_iterator(f, stall, n)
stall = stall or false
n = n or 1
local count = 0
return function(...)
count = count + 1
if count >= n then
count = 0
if stall then
async.stall()
else
coroutine.yield()
end
end
return f(...)
end
end
return async return async