2023-01-31 15:35:04 -07:00
|
|
|
local uv = require('luv')
|
|
|
|
local MsgpackRpcStream = require('test.client.msgpack_rpc_stream')
|
|
|
|
|
|
|
|
local Session = {}
|
|
|
|
Session.__index = Session
|
|
|
|
if package.loaded['jit'] then
|
|
|
|
-- luajit pcall is already coroutine safe
|
|
|
|
Session.safe_pcall = pcall
|
|
|
|
else
|
|
|
|
Session.safe_pcall = require'coxpcall'.pcall
|
|
|
|
end
|
|
|
|
|
|
|
|
local function resume(co, ...)
|
|
|
|
local status, result = coroutine.resume(co, ...)
|
|
|
|
|
|
|
|
if coroutine.status(co) == 'dead' then
|
|
|
|
if not status then
|
|
|
|
error(result)
|
|
|
|
end
|
|
|
|
return
|
|
|
|
end
|
|
|
|
|
|
|
|
assert(coroutine.status(co) == 'suspended')
|
|
|
|
result(co)
|
|
|
|
end
|
|
|
|
|
|
|
|
local function coroutine_exec(func, ...)
|
|
|
|
local args = {...}
|
|
|
|
local on_complete
|
|
|
|
|
|
|
|
if #args > 0 and type(args[#args]) == 'function' then
|
|
|
|
-- completion callback
|
|
|
|
on_complete = table.remove(args)
|
|
|
|
end
|
|
|
|
|
|
|
|
resume(coroutine.create(function()
|
|
|
|
local status, result, flag = Session.safe_pcall(func, unpack(args))
|
|
|
|
if on_complete then
|
|
|
|
coroutine.yield(function()
|
|
|
|
-- run the completion callback on the main thread
|
|
|
|
on_complete(status, result, flag)
|
|
|
|
end)
|
|
|
|
end
|
|
|
|
end))
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session.new(stream)
|
|
|
|
return setmetatable({
|
|
|
|
_msgpack_rpc_stream = MsgpackRpcStream.new(stream),
|
|
|
|
_pending_messages = {},
|
|
|
|
_prepare = uv.new_prepare(),
|
|
|
|
_timer = uv.new_timer(),
|
|
|
|
_is_running = false
|
|
|
|
}, Session)
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session:next_message(timeout)
|
|
|
|
local function on_request(method, args, response)
|
|
|
|
table.insert(self._pending_messages, {'request', method, args, response})
|
|
|
|
uv.stop()
|
|
|
|
end
|
|
|
|
|
|
|
|
local function on_notification(method, args)
|
|
|
|
table.insert(self._pending_messages, {'notification', method, args})
|
|
|
|
uv.stop()
|
|
|
|
end
|
|
|
|
|
|
|
|
if self._is_running then
|
|
|
|
error('Event loop already running')
|
|
|
|
end
|
|
|
|
|
|
|
|
if #self._pending_messages > 0 then
|
|
|
|
return table.remove(self._pending_messages, 1)
|
|
|
|
end
|
|
|
|
|
refactor(map): enhanced implementation, Clean Code™, etc etc
This involves two redesigns of the map.c implementations:
1. Change of macro style and code organization
The old khash.h and map.c implementation used huge #define blocks with a
lot of backslash line continuations.
This instead uses the "implementation file" .c.h pattern. Such a file is
meant to be included multiple times, with different macros set prior to
inclusion as parameters. we already use this pattern e.g. for
eval/typval_encode.c.h to implement different typval encoders reusing a
similar structure.
We can structure this code into two parts. one that only depends on key
type and is enough to implement sets, and one which depends on both key
and value to implement maps (as a wrapper around sets, with an added
value[] array)
2. Separate the main hash buckets from the key / value arrays
Change the hack buckets to only contain an index into separate key /
value arrays
This is a common pattern in modern, state of the art hashmap
implementations. Even though this leads to one more allocated array, it
is this often is a net reduction of memory consumption. Consider
key+value consuming at least 12 bytes per pair. On average, we will have
twice as many buckets per item.
Thus old implementation:
2*12 = 24 bytes per item
New implementation
1*12 + 2*4 = 20 bytes per item
And the difference gets bigger with larger items.
One might think we have pulled a fast one here, as wouldn't the average size of
the new key/value arrays be 1.5 slots per items due to amortized grows?
But remember, these arrays are fully dense, and thus the accessed memory,
measured in _cache lines_, the unit which actually matters, will be the
fully used memory but just rounded up to the nearest cache line
boundary.
This has some other interesting properties, such as an insert-only
set/map will be fully ordered by insert only. Preserving this ordering
in face of deletions is more tricky tho. As we currently don't use
ordered maps, the "delete" operation maintains compactness of the item
arrays in the simplest way by breaking the ordering. It would be
possible to implement an order-preserving delete although at some cost,
like allowing the items array to become non-dense until the next rehash.
Finally, in face of these two major changes, all code used in khash.h
has been integrated into map.c and friends. Given the heavy edits it
makes no sense to "layer" the code into a vendored and a wrapper part.
Rather, the layered cake follows the specialization depth: code shared
for all maps, code specialized to a key type (and its equivalence
relation), and finally code specialized to value+key type.
2023-05-17 07:08:06 -07:00
|
|
|
-- if closed, only return pending messages
|
|
|
|
if self.closed then
|
|
|
|
return nil
|
|
|
|
end
|
|
|
|
|
2023-01-31 15:35:04 -07:00
|
|
|
self:_run(on_request, on_notification, timeout)
|
|
|
|
return table.remove(self._pending_messages, 1)
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session:notify(method, ...)
|
|
|
|
self._msgpack_rpc_stream:write(method, {...})
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session:request(method, ...)
|
|
|
|
local args = {...}
|
|
|
|
local err, result
|
|
|
|
if self._is_running then
|
|
|
|
err, result = self:_yielding_request(method, args)
|
|
|
|
else
|
|
|
|
err, result = self:_blocking_request(method, args)
|
|
|
|
end
|
|
|
|
|
|
|
|
if err then
|
|
|
|
return false, err
|
|
|
|
end
|
|
|
|
|
|
|
|
return true, result
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session:run(request_cb, notification_cb, setup_cb, timeout)
|
|
|
|
local function on_request(method, args, response)
|
|
|
|
coroutine_exec(request_cb, method, args, function(status, result, flag)
|
|
|
|
if status then
|
|
|
|
response:send(result, flag)
|
|
|
|
else
|
|
|
|
response:send(result, true)
|
|
|
|
end
|
|
|
|
end)
|
|
|
|
end
|
|
|
|
|
|
|
|
local function on_notification(method, args)
|
|
|
|
coroutine_exec(notification_cb, method, args)
|
|
|
|
end
|
|
|
|
|
|
|
|
self._is_running = true
|
|
|
|
|
|
|
|
if setup_cb then
|
|
|
|
coroutine_exec(setup_cb)
|
|
|
|
end
|
|
|
|
|
|
|
|
while #self._pending_messages > 0 do
|
|
|
|
local msg = table.remove(self._pending_messages, 1)
|
|
|
|
if msg[1] == 'request' then
|
|
|
|
on_request(msg[2], msg[3], msg[4])
|
|
|
|
else
|
|
|
|
on_notification(msg[2], msg[3])
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
self:_run(on_request, on_notification, timeout)
|
|
|
|
self._is_running = false
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session:stop()
|
|
|
|
uv.stop()
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session:close(signal)
|
|
|
|
if not self._timer:is_closing() then self._timer:close() end
|
|
|
|
if not self._prepare:is_closing() then self._prepare:close() end
|
|
|
|
self._msgpack_rpc_stream:close(signal)
|
refactor(map): enhanced implementation, Clean Code™, etc etc
This involves two redesigns of the map.c implementations:
1. Change of macro style and code organization
The old khash.h and map.c implementation used huge #define blocks with a
lot of backslash line continuations.
This instead uses the "implementation file" .c.h pattern. Such a file is
meant to be included multiple times, with different macros set prior to
inclusion as parameters. we already use this pattern e.g. for
eval/typval_encode.c.h to implement different typval encoders reusing a
similar structure.
We can structure this code into two parts. one that only depends on key
type and is enough to implement sets, and one which depends on both key
and value to implement maps (as a wrapper around sets, with an added
value[] array)
2. Separate the main hash buckets from the key / value arrays
Change the hack buckets to only contain an index into separate key /
value arrays
This is a common pattern in modern, state of the art hashmap
implementations. Even though this leads to one more allocated array, it
is this often is a net reduction of memory consumption. Consider
key+value consuming at least 12 bytes per pair. On average, we will have
twice as many buckets per item.
Thus old implementation:
2*12 = 24 bytes per item
New implementation
1*12 + 2*4 = 20 bytes per item
And the difference gets bigger with larger items.
One might think we have pulled a fast one here, as wouldn't the average size of
the new key/value arrays be 1.5 slots per items due to amortized grows?
But remember, these arrays are fully dense, and thus the accessed memory,
measured in _cache lines_, the unit which actually matters, will be the
fully used memory but just rounded up to the nearest cache line
boundary.
This has some other interesting properties, such as an insert-only
set/map will be fully ordered by insert only. Preserving this ordering
in face of deletions is more tricky tho. As we currently don't use
ordered maps, the "delete" operation maintains compactness of the item
arrays in the simplest way by breaking the ordering. It would be
possible to implement an order-preserving delete although at some cost,
like allowing the items array to become non-dense until the next rehash.
Finally, in face of these two major changes, all code used in khash.h
has been integrated into map.c and friends. Given the heavy edits it
makes no sense to "layer" the code into a vendored and a wrapper part.
Rather, the layered cake follows the specialization depth: code shared
for all maps, code specialized to a key type (and its equivalence
relation), and finally code specialized to value+key type.
2023-05-17 07:08:06 -07:00
|
|
|
self.closed = true
|
2023-01-31 15:35:04 -07:00
|
|
|
end
|
|
|
|
|
|
|
|
function Session:_yielding_request(method, args)
|
|
|
|
return coroutine.yield(function(co)
|
|
|
|
self._msgpack_rpc_stream:write(method, args, function(err, result)
|
|
|
|
resume(co, err, result)
|
|
|
|
end)
|
|
|
|
end)
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session:_blocking_request(method, args)
|
|
|
|
local err, result
|
|
|
|
|
|
|
|
local function on_request(method_, args_, response)
|
|
|
|
table.insert(self._pending_messages, {'request', method_, args_, response})
|
|
|
|
end
|
|
|
|
|
|
|
|
local function on_notification(method_, args_)
|
|
|
|
table.insert(self._pending_messages, {'notification', method_, args_})
|
|
|
|
end
|
|
|
|
|
|
|
|
self._msgpack_rpc_stream:write(method, args, function(e, r)
|
|
|
|
err = e
|
|
|
|
result = r
|
|
|
|
uv.stop()
|
|
|
|
end)
|
|
|
|
|
|
|
|
self:_run(on_request, on_notification)
|
|
|
|
return (err or self.eof_err), result
|
|
|
|
end
|
|
|
|
|
|
|
|
function Session:_run(request_cb, notification_cb, timeout)
|
|
|
|
if type(timeout) == 'number' then
|
|
|
|
self._prepare:start(function()
|
|
|
|
self._timer:start(timeout, 0, function()
|
|
|
|
uv.stop()
|
|
|
|
end)
|
|
|
|
self._prepare:stop()
|
|
|
|
end)
|
|
|
|
end
|
|
|
|
self._msgpack_rpc_stream:read_start(request_cb, notification_cb, function()
|
|
|
|
uv.stop()
|
|
|
|
self.eof_err = {1, "EOF was received from Nvim. Likely the Nvim process crashed."}
|
|
|
|
end)
|
|
|
|
uv.run()
|
|
|
|
self._prepare:stop()
|
|
|
|
self._timer:stop()
|
|
|
|
self._msgpack_rpc_stream:read_stop()
|
|
|
|
end
|
|
|
|
|
|
|
|
return Session
|