0%

在 Redis 的设计中,用户可以使用 Lua 脚本来完成拓展功能,实现一些 RESP 协议中不提供的操作,这些复杂功能往往需要操作多个键值对,并且操作具有上下文关系,如限流算法、简单事务等。

在 Redis 官方手册中的介绍中,使用 Lua 脚本具有如下优点:

  • 逻辑运行在服务端处而非客户端处,减少了 C/S 之间传输的网络延迟;
  • Lua 脚本独占服务器运行权,能够保证脚本执行的原子性;
  • 能够组合使用 Redis 支持的现有操作以完成更加复杂的逻辑。

在《Redis 设计与实现》一书中,作者详细地讲述了 Lua 脚本虚拟环境的搭建、Lua 脚本的运行流程;但是书中较少展示Redis 的源码,如果对 Lua 了解较少,可能会对该部分的具体实现过程较为困惑。本文将以 Redis 6.2.6 版本为例,介绍 Redis 中 Lua 脚本部分的实现。

Lua 脚本简介

Lua 是一种脚本语言,它使用标准 C 语言编写,具有很强的嵌入能力,能够作为“胶水语言”来为应用程序提高扩展性。除了 Redis 以外,Nginx 也同样适用了 Lua 来支持拓展功能。

Lua 语言的一个重要特性是它支持 C/Lua 函数之间的相互调用,这种相互调用是依赖于 Lua 虚拟机的栈特性实现的。下面以一个 demo 来简述 Lua 虚拟机中的栈特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static int l_Replace(lua_State *L) {
printf("Read From Lua: %s\n",luaL_checkstring(L,1)); // 从栈顶读取数据
lua_pushstring(L, "replaced"); // 返回给lua的值压栈
return 1; // 返回 1 代表运行成功
}

static int l_Origin(lua_State *L) {
return 1;
}

static int l_Fail(lua_State *L){
lua_pushstring(L,"fail");
return 0;
}

int main(int argc, char *argv[]) {
lua_State *L = luaL_newstate(); // 创建lua状态机
luaL_openlibs(L); // 打开Lua状态机中所有Lua标准库
lua_register(L, "replace", l_Replace);//注册C函数到lua
lua_register(L, "origin", l_Origin);//注册C函数到lua
lua_register(L, "fail", l_Fail);//注册C函数到lua

const char *testfunc = "print('lua output:',replace('original'))"
"print('lua output:',origin('original'))"
"print('lua output:',fail('original'))"; //lua中调用c函数
if (luaL_dostring(L, testfunc)) // 执行Lua命令。
printf("Failed to invoke.\n");

lua_close(L);
return 0;
}

这段函数的输出结果为:

1
2
3
4
Read From Lua: original
lua output: replaced
lua output: original
lua output:

当 Lua 中需要调用 C/C++ 函数时,需要使用 Lua 标准库中的lua_register函数将相应的函数注册到 Lua 虚拟机中。与传统的函数调用方式不同,Lua 中并不是依赖于 C/C++ 函数的返回值来进行值传递,而是在调用过程中创建一个栈,需要 C/C++ 函数将需要返回的值压入栈中,当该函数运行结束后,再将栈中的值作为函数的返回值;而 C/C++ 的返回值则用于表示是否执行成功,如果 return 0,则代表值 nil。

lua_stack

这种设计解决了 Lua 这种动态类型语言与 C/C++ 这种强类型语言之间值传递的问题,并且屏蔽了彼此之间的内存差异。但是这种方式使得 C/C++ 代码中需要嵌套大量操作 Lua 虚拟机的代码,具有一定的编程难度,比较好的一种做法是设置专门的模块作为中间层与 Lua 虚拟机进行交互,避免其他模块代码直接与 Lua 虚拟机进行耦合。这一思想在 Redis 源码中也有所体现。

Redis 中 Lua 虚拟环境的构建

虚拟环境的构建流程

Redis 中 Lua 虚拟机环境的创建是在scriptingInit函数中完成的,函数比较简单,将代码简化后,可以分为以下几个阶段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
void scriptingInit(int setup) {

// step1: 使用 liblua 来创建虚拟环境
lua_State *lua = lua_open();

if (setup) {
server.lua_client = NULL; // 清理 server 参数
server.lua_caller = NULL;
server.lua_cur_script = NULL;
server.lua_timedout = 0;
ldbInit();
}

// step2: 加载需要的库,并去除不需要的库
luaLoadLibraries(lua);
luaRemoveUnsupportedFunctions(lua);

// step3: 创建字典,用于保存 Lua 脚本
server.lua_scripts = dictCreate(&shaScriptObjectDictType,NULL);
server.lua_scripts_mem = 0;

// step4: 注册函数到 redis 哈希表中
// 创建哈希表
lua_newtable(lua);

// 注册 redis.call
lua_pushstring(lua,"call");
lua_pushcfunction(lua,luaRedisCallCommand);
lua_settable(lua,-3);

// 省略了重复的注册过程
...

// 将哈希表命名为 redis 并设置为全局变量
lua_setglobal(lua,"redis");

// step5: 更替 random 函数库
lua_getglobal(lua,"math");

lua_pushstring(lua,"random");
lua_pushcfunction(lua,redis_math_random);
lua_settable(lua,-3);

lua_pushstring(lua,"randomseed");
lua_pushcfunction(lua,redis_math_randomseed);
lua_settable(lua,-3);

lua_setglobal(lua,"math");

// 添加帮助函数,过程省略
...

// step6: 创建 fake 客户端
if (server.lua_client == NULL) {
server.lua_client = createClient(NULL);
server.lua_client->flags |= CLIENT_LUA;
server.lua_client->flags |= CLIENT_DENY_BLOCKING;
}

// step7: 设置全局保护
scriptingEnableGlobalsProtection(lua);

// step8: 保存 lua 环境
server.lua = lua;
}

初始化函数虽然较长,结构为线性结构,逻辑也比较简单,初始化函数中的几个阶段分别完成了如下工作:

  1. 使用 liblua 库函数创建 lua 虚拟机环境;
  2. 加载 Lua 基础函数库与表格库,并移除loadfiledofile函数,防止引发安全问题;
  3. 创建一个 dict 用于保存 sha-script;
  4. 在 Lua 中创建 redis 表,并将 C 函数注册到该表格中,从而实现redis.call接口;
  5. 更替 random 函数库,保证随机函数在不同主机上生成相同的序列;
  6. 创建一个 fake 客户端,用于交互 redis 数据库;
  7. 设置全局保护,禁止设置全局变量;
  8. 将已经完成设置的 Lua 环境保存到 server 结构体中。

全局变量保护的实现

全局变量保护是许多 Lua 虚拟环境中都会完成的工作,利用的原理也大都相似——通过修改 _ENV 表中 _G 表中 metatable 来实现。metatable 是 Lua 表中的一个特殊表,它的每个字段中都存储了控制当前表一些行为的函数,如 __index字段 控制索引操作、__add字段控制表的相加操作。当需要对一个表进行操作时,LVM 将会查询该表的 metatable 是否有相关控制字段,若有该控制字段,会按照字段中存储的函数来执行操作。

基于元表的这一原理,通过修改 _G 表中 metatable 的 __newindex 和 ``__index`字段,就可以实现对插入和读取全局变量的保护。Redis 中实现该行为所使用的 Lua 脚本为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
local dbg=debug
local mt = {}
-- 将 mt 赋值给 _G 的 metatable
setmetatable(_G, mt)
mt.__newindex = function (t, n, v)
if dbg.getinfo(2) then
local w = dbg.getinfo(2, "S").what
-- 如果不在 main 或 C 函数中,将不允许设置环境变量
-- Redis 环境中,用户脚本都是包裹在 f_sha 函数中运行的,不能创建全局变量
if w ~= "main" and w ~= "C" then
error("Script attempted to create global variable '"..tostring(n).."'", 2)
end
end
-- 除上述条件外,允许设置全局变量
-- 这是为了将用户脚本放入到全局变量中
rawset(t, n, v)
end
mt.__index = function (t, n)
-- 如果不在 main 或 C 函数中,将不允许访问不存在的环境变量
if dbg.getinfo(2) and dbg.getinfo(2, "S").what ~= "C" then
error("Script attempted to access nonexistent global variable '"..tostring(n).."'", 2)
end
return rawget(t, n)
end
debug = nil

由于 Redis 中所有的用户脚本都是被包裹在名为 f_sha 的函数中运行的,这里通过修改元表中的行为,不允许在非 main 以及非 C 条件下设置全局变量、访问不存在的全局变量;这样就实现了全局变量的保护。

redis.call 的实现

这里介绍一下redis.call()接口的实现。在 Lua 中,函数是一等公民,可以作为值被放入到哈希表中,然后从哈希表中取出并调用。redis.call()其实是一种 Lua 语法糖的写法,其原始的写法应该是redis['call'](),即从 redis 哈希表中取出键为 call 的函数值并调用。在环境初始化过程中,redis.call最终会绑定到luaRedisGenericCommmand函数上,该函数负责与 redis 数据库进行交互,Lua 环境下并不会直接与 C 环境下的 redis 数据库进行交互。

luaRedisGenericCommmand函数总长度约为 300 行,大部分逻辑为类型检查、异常处理等。剔除这些部分,仅仅保留主执行逻辑,我们可以把该函数分为以下几个阶段。函数中所有的参数检查都已经被删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
int luaRedisGenericCommand(lua_State *lua, int raise_error) {

// step1: 参数绑定阶段
// 从栈中获取传递的参数个数 argc ,并构建 argv
int j, argc = lua_gettop(lua);
struct redisCommand *cmd;
client *c = server.lua_client;
sds reply;
static robj **argv = NULL;
static int argv_size = 0;

if (argv_size < argc) {
argv = zrealloc(argv,sizeof(robj*)*argc);
argv_size = argc;
}

// 构建 agrv 数组
for (j = 0; j < argc; j++) {
char *obj_s;
size_t obj_len;
char dbuf[64];

// 依次解析栈中的参数,要求参数为 number 或 string 类型
if (lua_type(lua,j+1) == LUA_TNUMBER) {
lua_Number num = lua_tonumber(lua,j+1);
obj_len = snprintf(dbuf,sizeof(dbuf),"%.17g",(double)num);
obj_s = dbuf;
} else {
obj_s = (char*)lua_tolstring(lua,j+1,&obj_len);
if (obj_s == NULL) break; /* Not a string. */
}
}

/* Setup our fake client for command execution */
c->argv = argv;
c->argc = argc;
c->user = server.lua_caller->user;

/* Process module hooks */
moduleCallCommandFilters(c);
argv = c->argv;
argc = c->argc;

// step2: 获取 command,检查是否允许运行
cmd = lookupCommand(argv[0]->ptr);

// command 检查,主要可以分为以下几个步骤:
// 1.是否存在
// 2.command 是否允许运行
// 3.当前内存是否达到上限
// 4.检查随机函数与写函数
// 5.检查 ACL 与 Cluster 选项


// step3: 命令运行阶段
// 更新 server 状态
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
if (server.lua_replicate_commands) {

if (server.lua_repl & PROPAGATE_AOF)
call_flags |= CMD_CALL_PROPAGATE_AOF;
if (server.lua_repl & PROPAGATE_REPL)
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
// core : 调用 redis 中的命令表
call(c,call_flags);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);

// step4: 命令结束阶段
// 将客户端收到的 RESP 格式结果转换为 Lua 格式结果,并送入 Lua 栈中。
redisProtocolToLuaType(lua,reply);

// step5: 清理阶段
cleanup:

// 清理 argc/argv 并尝试缓存。
...

// 错误处理,将 err 字段压入哈希表中
if (raise_error) {
/* If we are here we should have an error in the stack, in the
* form of a table with an "err" field. Extract the string to
* return the plain error. */

return luaRaiseError(lua);
}
return 1;
}

整体来说,该函数的逻辑也较为简单:从 Lua 栈中取出参数——调用 redis 命令表——将结果压入 Lua 栈中。函数中使用了一个 fake 客户端与数据库部分进行交互,因此luaRedisGenericCommand函数中必须要在 Lua 类型与 RESP 类型之间进行两次类型转换,但是这样的设计却有更多的优点:

  • 避免重写数据库操作函数,降低了工程量;
  • 共用操作函数,保障了 Lua 脚本与其他命令执行结果的一致性;
  • 加入了中间层,降低了 Lua 环境与 C 环境的耦合度;

由于 Lua 语言本身是不含错误处理的,因此 Redis 额外设置了 pcall 用于包裹命令的执行,通过操作 Lua 栈的形式来进行错误的传递。当 Lua 脚本中使用redis.pcall时,luaRedisGenericCommand函数中的参数raise_error会被设置为 1,如果在命令执行过程中发生了错误,那么将会在最终的清理阶段通过luaRaiseError函数主动将错误信息压入到 Lua 栈中,以此来达到错误传递的目的。

eval 族函数的一些实现

Redis 中 eval 族命令最终都会调用evalGenericCommand,该函数的细节部分较多,这里将函数分为几个部分来讲述,以下代码片段在非标明前提下均来自于evalGenericCommand函数。

脚本的存储与编译

Redis 服务器提供了 Lua 脚本的复用功能,所有的脚本都会以 f_sha 的形式命名并存储在字典以及 Lua 环境中,其中 sha 是脚本经过 sha1hex 算法后计算得到的 40 位字符串。在调用脚本时,Redis 会直接尝试在 Lua 环境中使用 f_sha 来查找函数,若未找到,才会进行脚本的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
funcname[0] = 'f'; funcname[1] = '_';
if (!evalsha) {
// 计算 sha 值
sha1hex(funcname+2,c->argv[1]->ptr,sdslen(c->argv[1]->ptr));
} else {
int j; char *sha = c->argv[1]->ptr;
for (j = 0; j < 40; j++)
funcname[j+2] = (sha[j] >= 'A' && sha[j] <= 'Z') ? sha[j]+('a'-'A') : sha[j];
funcname[42] = '\0';
}
/* Push the pcall error handler function on the stack. */
lua_getglobal(lua, "__redis__err__handler");

// 在 Lua 环境中查找脚本
lua_getglobal(lua, funcname);
if (lua_isnil(lua,-1)) {
lua_pop(lua,1);
if (evalsha) {
lua_pop(lua,1);
addReplyErrorObject(c, shared.noscripterr);
return;
}
// 进行脚本的初始化
if (luaCreateFunction(c,lua,c->argv[1]) == NULL) {
lua_pop(lua,1);
return;
}
// 重新获取脚本函数
lua_getglobal(lua, funcname);
serverAssert(!lua_isnil(lua,-1));
}

luaCreateFunction函数中,用户发送的脚本会被处理,并写入到 Lua 环境中的全局表中。假设用户所发送的 Lua 脚本内容为 function_body,那么最终将会被拼接为如下的形式:

1
2
function f_<sha>() function_body
end

如果对 Redis 手册较为熟悉,那么可以了解到所有的 Lua 脚本在服务器重启后都会失效,需要重新加载。这一特性从函数luaCreateFunction中可以了解到缘由。Lua 脚本在被创建时,会被注册到lua_scripts哈希表中,而这一哈希表对于 redis 来说是一个“二等公民”,它并不享有数据持久化的功能,在每一次重启时,redis 并不会自动导入之前已经载入的脚本。Lua 脚本相关的持久化功能只会被用于主从复制等场景。

脚本运行

Lua 脚本的运行在 C 代码中的体现比较少,在经过脚本的存储与编译后,只使用了如下代码来运行脚本:

1
2
3
4
5
6
7
8
9
10
11
12
// 将 KEYS 和 ARGV 作为全局变量放入 Lua 环境
luaSetGlobalArray(lua,"KEYS",c->argv+3,numkeys);
luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys);

prepareLuaClient();

/* At this point whether this script was never seen before or if it was
already defined, we can call it. We have zero arguments and expect
a single return value. */
err = lua_pcall(lua,0,1,-2);

resetLuaClient();

由于脚本内容和输入参数已经被送入到了 Lua 虚拟机的栈中,这里直接使用了lua_pcall执行脚本,运行中需要访问数据库的操作将会使用已经注册到 Lua 环境中的 redis 族函数。若脚本在运行中发生错误,那么 Lua 栈顶部将会存储错误信息,在清理阶段将会收集信息并返回客户端。

超时检测

Redis 中所有用户命令的执行都是单线程串行的,为了防止单个 Lua 脚本运行时间过长阻塞服务端,Lua 模块中提供了脚本运行超时功能,当脚本运行超时后,用户可以使用 script kill 命令来强行结束脚本。这一功能是通过 Lua hook 来实现的,在evalGenericCommand函数中使用lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);代码将函数luaMaskCountHook作为 hook 注册到 Lua 环境中,每执行 100000 条语句,Lua 将强制执行一次该函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
// 计算脚本运行的事件
long long elapsed = elapsedMs(server.lua_time_start);
UNUSED(ar);
UNUSED(lua);
// 如果脚本运行超时
if (elapsed >= server.lua_time_limit && server.lua_timedout == 0) {
serverLog(LL_WARNING,
"Lua slow script detected: still in execution after %lld milliseconds. "
"You can try killing the script using the SCRIPT KILL command. "
"Script SHA1 is: %s",
elapsed, server.lua_cur_script);
server.lua_timedout = 1;
blockingOperationStarts();
// 防止其他部分代码关闭脚本调用客户端
protectClient(server.lua_caller);
}
if (server.lua_timedout) processEventsWhileBlocked();
if (server.lua_kill) {
serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");

lua_sethook(lua, luaMaskCountHook, LUA_MASKLINE, 0);
// 在 Lua 环境下退出运行
lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");
lua_error(lua);
}
}

脚本的运行时间是由该函数进行计算的,当脚本运行超时时函数将会输出一条日志,并且允许其他客户端强行中断脚本。中断一个正在运行的脚本是有条件的,必须确保当前执行的脚本没有进行过写操作,否则可能会严重影响数据库的安全性,甚至让数据库处于中间状态。Redis 中是通过记录脚本运行状态来判断脚本是否执行过写操作的,即标识位server.lua_write_dirty。在 Lua 脚本准备执行写操作时,会将该标识位置为 1,此时将不会允许脚本被中断。

Redis 同样会在该 hook 函数中来执行中断脚本的逻辑。当脚本发生超时后,每次执行 hook 函数,都会调用一次特殊的事件处理函数processEventsWhileBlocked

Redis 的事务执行是单线程的,那么当 eventLoop 正在执行脚本时,为什么 Redis 还可以处理其余命令。Redis 并没有为 Lua 脚本单独开辟线程来运行,而是利用 Lua 环境中注册的 hook 函数来执行的。在 hook 函数中调用了processEventsWhileBlocked

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void processEventsWhileBlocked(void) {
int iterations = 4; // 限制处理事务的时间

updateCachedTime(0);

ProcessingEventsWhileBlocked = 1;
while (iterations--) {
long long startval = server.events_processed_while_blocked;
long long ae_events = aeProcessEvents(server.el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);

server.events_processed_while_blocked += ae_events;
long long events = server.events_processed_while_blocked - startval;
if (!events) break;
}

whileBlockedCron();
ProcessingEventsWhileBlocked = 0;
}

processEventsWhileBlocked每一次被调用时都会最多处理四个已经解析完毕的客户端命令。在阻塞状态下,Redis 服务器将会只允许一部分命令执行,这些命令不影响数据库的状态或者用于关闭正在运行的脚本,其余命令将会被拒绝执行。允许被执行的命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int processCommand(client *c) {
...
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != helloCommand &&
c->cmd->proc != replconfCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
c->cmd->proc != resetCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
rejectCommand(c, shared.slowscripterr);
return C_OK;
}
...
}

在允许被执行的命令中,auth, hello, replconf, multi, discard, watch, unwatch, reset 都不会对数据库产生任何影响,其余的两个命令则是用于关闭脚本运行。

随机命令检测

Redis 对 Lua 脚本环境下的随机命令有所限制,根据官方手册:Redis 不允许在 Lua 脚本中随机命令发生在写命令前,单独的随机命令不受影响;这是为了防止同一份 Lua 脚本在不同实例中的执行结果不同。例如,一个脚本的执行逻辑是随机获取 100 个键,并将其中键值最小的 10 个键删除;由于每个实例中随机获取的键不同,最终可能会导致不同 Redis 实例的状态不同。但是先写入,后随机读取的情况则是允许的,例如先更新一个键,然后随机读取 10 个键;这不会造成 Redis 实例状态不同。

这一功能的实现原理同脚本中断,也是使用标识位来实现的。这一功能的实现位于luaRedisGenericCommmand函数中,处于获取命令后的检查阶段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int luaRedisGenericCommand(lua_State *lua, int raise_error) {

...

if (cmd->flags & CMD_WRITE) {
int deny_write_type = writeCommandsDeniedByDiskError();
if (server.lua_random_dirty && !server.lua_replicate_commands) {
luaPushError(lua,
"Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
goto cleanup;
}
// 其他检查
...
}
// 如果命令是写操作或随机操作,修改标识位
if (cmd->flags & CMD_RANDOM) server.lua_random_dirty = 1;
if (cmd->flags & CMD_WRITE) server.lua_write_dirty = 1;

...
}

可以看到这一段函数的逻辑是在写操作中检查if(server.lua_random_dirty == 1),因此先执行写操作,后执行随机操作是被允许的。但是,Redis 中实现的这一冲突检查并没有判断写操作是否是依赖于随机操作的,哪怕写操作与随机操作之间毫无关系,同样也会被报错;例如先随机读取 100 个键,然后删除掉特定的键,这样的逻辑是不被允许的。

另外,值得注意的是,这段逻辑中只检查了 Lua 脚本中的 redis 命令是否为随机命令,并不会检查 Lua 脚本中是否使用了随机函数。这是因为在 Lua 环境初始化阶段将math.rand函数已经被更替为 redis 的实现;redis 的随机函数实现中,只要保证使用相同的随机数种子,就能够在不同的主机环境下生成相同的随机数序。因此能够确保 Lua 脚本中的随机函数在不同主机下运行结果相同,不需要进行检测。

Redis 这里的实现是非常巧妙的,除了 Redis 选择的这种方法外,还有其他的方式可以完成随机函数的检测。其一是直接通过扫描脚本内容来实现随机函数的检测,这种方法能够在脚本运行前完成推断,但是需要依赖于语义分析,不仅实现困难,而且运行性能比较差。其二是通过覆盖 Lua 环境中的随机函数,在调用随机函数时更改server.lua_random_dirty标识位,这种方法的性能损耗比较小,但是也有一个比较隐秘的缺点——相比 Redis 实现,当用户只使用 Lua 随机函数时,需要中断脚本的运行。Redis 的实现方法基本上是无副作用的,并且 Lua 环境中并没有使用与数据库相同的随机种子,这也在一定程度上保护了数据库的安全性,防止通过脚本来推断出 Redis 数据库使用的随机种子。

多机环境下的 Lua 脚本

在单机环境下,Redis 对 Lua 模块的限制较小,但是在多机环境下,为了保障数据库的安全性,Redis 对 Lua 模块加入了一些限制条件。

Replica 环境下的限制条件

Replica 环境下的限制条件主要有以下几点:

  • 随机命令与写命令的顺序问题:随机命令不能发生在写命令之前,即使写命令不依赖随机命令;
  • 脚本的加载时机与 evalsha 命令的传播问题;

考虑这样一种情况:在构建复制集之前,Master 节点已经载入了一个 Lua 脚本。由于 Lua 脚本是一个“二等公民”,脚本本身并不会被 AOF/RDB 持久化记录;因此在构建出的复制集中,Slave 节点是不存在 Lua 脚本的源文件的。如果此时客户端直接使用 evalsha 命令来调用 Lua 脚本,Slave 节点中将无法执行该脚本,因为脚本并未在 Slave 节点中完成初始化。这有可能会导致主从节点的状态不一致。

为了解决这一冲突,在主从复制模式下,evalsha 会被转义为 eval 命令在主从节点之间传递,即传递全部 Lua 脚本。如果脚本过大,可能会影响网络带宽。

Cluster 环境下的限制条件

禁止访问不同分片

Cluster 环境下的限制更为苛刻:脚本只允许同时被访问当前实例所负责的分片。一方面是由于 Redis Cluster 设计,避免脚本重放,另一方面则是为了规避分布式事务的复杂性。

在 Redis Cluster 中允许一个分片中存在一主多从来实现故障恢复,保证高可用。考虑如下一种情况,分片 A 目前存活一主一从,而分片 B 目前存活一主;如果 Lua 脚本被允许访问不同分片上的数据,那么当分片 A 的主节点执行完毕后,分片 B 的从节点也需要执行一次脚本,这就需要付出一些额外的检查措施来保证分片 B 中的数据只会访问一次。这将会大大增加系统的复杂度,很难保证这一功能在加入之后会 0 bug。

另一方面,Lua 脚本中是允许使用 MULTI 事务的,如果支持访问多个分片的数据就必然会引入分布式事务的问题。Redis 本身定位是一个弱事务的内存数据库,必然不可能支持这一特性。

Redis 是在脚本运行中检查键是否存在于当前分片的,检查发生在luaRedisGenericCommand函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int luaRedisGenericCommand(lua_State *lua, int raise_error) {

...

/* If this is a Redis Cluster node, we need to make sure Lua is not
* trying to access non-local keys, with the exception of commands
* received from our master or when loading the AOF back in memory. */
if (server.cluster_enabled && !server.loading &&
!(server.lua_caller->flags & CLIENT_MASTER))
{
int error_code;
/* Duplicate relevant flags in the lua client. */
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
c->flags |= server.lua_caller->flags & (CLIENT_READONLY|CLIENT_ASKING);
// 检查的具体逻辑发生在 getNodeByQuery 函数中
if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,&error_code) !=
server.cluster->myself){
// 自身分片不可用
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
luaPushError(lua,
"Lua script attempted to execute a write command while the "
"cluster is down and readonly");
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
luaPushError(lua,
"Lua script attempted to execute a command while the "
"cluster is down");
} else {
// 试图访问其他分片
luaPushError(lua,
"Lua script attempted to access a non local key in a "
"cluster node");
}
goto cleanup;
}
}

...
}

Redis 并不会在脚本运行前对所有需要访问的键进行检查,而是在运行中进行检查。因此,贸然访问不同分片上的值可能会让 Lua 脚本在运行中被中断,无法保证脚本逻辑的完整性。在分片模式下使用 Lua 脚本必须首先确保访问的所有键值对都处于同一分片,最好不要同时访问多个键。

在确定分片时,如果 key 值中存在{},那么会使用第一个{}中的值计算哈希槽;当集群模式下需要用 Lua 脚本来访问多个键时,可以将键值对以相同的开头命名,确保所有键值对都被分配到同一个哈希槽内。

禁止使用发布订阅

在 Lua 脚本中使用发布订阅同样也会因为脚本重放而引发一系列问题。由于 Cluster 模式中全局共享发布订阅频道,当脚本在主从节点之间传递时,会导致发布订阅命令被多次执行。

Redis Function

Redis Function 是 Redis 7.0 版本推出的全新功能,该功能是在原有 Lua 模块上的扩展与完善。Redis Function 将会被作为“一等公民”存储在数据库中,支持完整的持久化功能,这解决了 Lua 模块中的一些痛点。在使用 Redis 7.0 时,可以使用 Redis Function 来代替 Lua 模块。

Bolt 是一个完全由 go 写成的基于硬盘的 K/V 存储引擎,为了表示区分一般被称为 boltdb。对于一款存储引擎而言,boltdb 的源码非常短小精悍,包含测试代码仅有 17000 行左右。正是因为其体量比较小,boltdb 并不具备较为复杂的功能;在实现上,boltdb 主要借鉴了其他语言中的做法并将其改进,使之更加符合 go 语言的基础架构。

通读 boltdb 的代码,主要是为了学习作者是如何利用 go 语言的特性来实现数据库的,这是本文的重点内容。而 boltdb 的用法以及其中的基本概念,可以参考《自底向上分析 BoltDB 源码》

各模块实现方式

首先介绍一下 boltdb 中各个模块的实现方式:

  • 数据库文件落盘:使用 mmap 读取数据库文件,使用 write 接口写回脏页;
  • 空闲页面管理:使用链表/哈希表进行管理,按需取出,不够时重新 mmap 映射;
  • 数据库文件 compact:使用写缓冲,重写数据库文件;
  • 内存硬盘之间页面大小保持一致,直接映射;
  • 无日志,事务提交时直接落盘,使用读写锁控制并发。

可以看到 boltdb 中的各个模块实现都是比较简单的,这也在一定程度上保障了其稳定性。boltdb 无日志的设计使得写写之间是禁止并发的,因此 boltdb 可以很轻松地实现可串行级别的事务隔离。同时这也导致 boltdb 的写入性能非常差,只适合多读少写的场景。另外,boltdb 直接使用了 mmap 来映射数据库文件,这在一方面避免了 go 中的 GC 保证了数据库的性能,但另一方面当数据库大小超过内存时,可能会导致内存页面频繁被置换从而影响性能。但在 go 语言中大量使用指针可能会导致 GC 问题,使用 mmap 映射相当于避免了内存 B+ 树页面的 GC 扫描,这在 go 语言限制下应该是一种比较好的设计。

mmap 映射

mmap 映射实现

boltdb 使用 mmap 系统调用的源码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func mmap(db *DB, sz int) error {
// Map the data file to memory.
b, err := unix.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags)
if err != nil {
return err
}

// Advise the kernel that the mmap is accessed randomly.
err = unix.Madvise(b, syscall.MADV_RANDOM)
if err != nil && err != syscall.ENOSYS {
// Ignore not implemented error in kernel because it still works.
return fmt.Errorf("madvise: %s", err)
}

// Save the original byte slice and convert to a byte array pointer.
db.dataref = b
db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
db.datasz = sz
return nil
}

这部分内容很简单,将 mmap 映射获得的内存设置为只读,并且允许进程间共享;调用 madvise 将内存片段设置为随机读取模式,防止操作系统按照顺序读的方式来置换出内存页面。 最后,boltdb 将 mmap 获得的内存片段转换为了一个长度为maxMapSize的切片指针,这一个步骤主要是为了方便进行pagemeta等数据结构的映射。设置切片长度为maxMapSize是为了能够方便地完成 Golang 中的 unsafe 转换,保证转换长度足够。

如果成功获得内存映射片段,boltdb 还会调用mlock尝试禁止页面置换,从而提升数据库读性能。

1
2
3
4
5
6
7
8
9
10
func (db *DB) mmap(minsz int) error {
...
if db.Mlock {
// Don't allow swapping of data file
if err := db.mlock(fileSize); err != nil {
return err
}
}
...
}

虽然mlock调用会抛出 error,但是并不会影响数据库的正常处理流程。

boltdb 并没有使用 mmap 接口直接进行文件写入的操作,因为 mmap 的写入时机较为特殊,它不会立刻进行刷盘,而是等待内存页面被置换出时再进行刷盘,或者进程主动使用 madvise + msnyc 进行刷盘。这是因为操作系统并不确定进程是否会在未来的一段时间内再次对内存片段进行更新,因此采取惰性处理策略会更好。boltdb 中选择使用了writeAt接口进行文件写入。当一个写事务被提交时,它会将当前数据库文件中的所有脏页写回到硬盘中。

mmap 映射策略

随着数据库文件大小的增长,boltdb 从操作系统中获得的 mmap 内存大小可能会小于数据库文件的长度。这种情况发生时,boltdb 将会再次使用 mmap 来增加映射内存的长度。boltdb 每次进行内存映射时,映射的内存大小并不等于数据库文件的大小,而是根据一定策略来选择内存大小。选择映射内存大小的逻辑出现在 db.go 的mmapSize函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (db *DB) mmapSize(size int) (int, error) {
// Double the size from 32KB until 1GB.
for i := uint(15); i <= 30; i++ { // 数据库文件小于1GB时,每次映射的大小翻倍
if size <= 1<<i {
return 1 << i, nil
}
}
// Verify the requested size is not above the maximum allowed.
// maxMapSize == 0xFFFFFFFFFFFF
if size > maxMapSize {
return 0, fmt.Errorf("mmap too large")
}
// If larger than 1GB then grow by 1GB at a time.
sz := int64(size)
// maxMmapStep == 1 << 30
if remainder := sz % int64(maxMmapStep); remainder > 0 {
sz += int64(maxMmapStep) - remainder
}
// Ensure that the mmap size is a multiple of the page size.
// This should always be true since we're incrementing in MBs.
pageSize := int64(db.pageSize)
if (sz % pageSize) != 0 {
sz = ((sz / pageSize) + 1) * pageSize
}
// If we've exceeded the max size then only grow up to the max size.
if sz > maxMapSize {
sz = maxMapSize
}
return int(sz), nil
}

根据源码,我们可以得到 boltdb 中内存映射的策略具有两个阶段,即快速增长期与慢速增长期。当数据库文件小于 1GB 时,boltdb 的内存分配处于快速增长期,进行内存映射时会直接按照数据库文件大小向上取整为 2 的幂。当数据库文件大于 1GB 时,每一次进行内存映射时的内存大小较上一次增长 1GB。由于在慢速增长期,内存映射策略是步进的,可能会出现步进后的内存大小并不是页面大小(4 KB)的整数倍,这种情况下使用最后一个页面将会导致程序越界访问内存,导致程序崩溃;因此需要向上取整为页面的大小。

struct 零成本重建

boltdb 使用了直接映射的方式来完成一些数据结构的写入和重建,该方式并不会拷贝内存而是重新解释 unsafe pointer 来完成对象的重建,这些操作使用了 unsafe.go 中的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func unsafeAdd(base unsafe.Pointer, offset uintptr) unsafe.Pointer {
return unsafe.Pointer(uintptr(base) + offset)
}

func unsafeIndex(base unsafe.Pointer, offset uintptr, elemsz uintptr, n int) unsafe.Pointer {
return unsafe.Pointer(uintptr(base) + offset + uintptr(n)*elemsz)
}

func unsafeByteSlice(base unsafe.Pointer, offset uintptr, i, j int) []byte {

return (*[maxAllocSize]byte)(unsafeAdd(base, offset))[i:j:j]
}

// unsafeSlice modifies the data, len, and cap of a slice variable pointed to by
// the slice parameter. This helper should be used over other direct
// manipulation of reflect.SliceHeader to prevent misuse, namely, converting
// from reflect.SliceHeader to a Go slice type.
func unsafeSlice(slice, data unsafe.Pointer, len int) {
s := (*reflect.SliceHeader)(slice)
s.Data = uintptr(data)
s.Cap = len
s.Len = len
}

该方法可以用来进行简单对象的转换,相当于进行了一次对象的引用,重建后的对象地址会发生改变,但对象内容中的各个子对象地址都不会发生改变;下面使用一个 demo 来演示该方法是如何工作的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Foo struct {
content1 int32
content2 int32
}

func (d *Foo) unsafeByteSlice() []byte {

return (*[unsafe.Sizeof(*d)]byte)(unsafe.Pointer(d))[0:unsafe.Sizeof(*d)]
}

func parseFromByteSlice(serialized []byte) *Foo {

return (*Foo)(unsafe.Pointer(&serialized[0]))
}

func ObjectSerializedTest() {
d := Foo{1, 2}

now := time.Now()

slice := d.unsafeByteSlice()
d.content2 = 3

dd := parseFromByteSlice(slice)
println("time cost:", time.Since(now).Nanoseconds(), "ns")
println("dd.content1:", dd.content1)
println("dd.content2:", dd.content2)
println("d address:", &d)
println("dd address:", &dd)
println("d content1 address:", &d.content1)
println("dd content2 address:", &dd.content1)
}

demo 先获取对象 d 的内存片段,然后修改对象 dd 的值,再使用该内存片段重建对象,程序输出结果为:

1
2
3
4
5
6
7
time cost: 162 ns
dd.content1: 1
dd.content2: 3
d address: 0xc000062f38
dd address: 0xc000062f50
d content1 address: 0xc000062f38
dd content2 address: 0xc000062f38

可以看到,dd 与 d 两个对象的地址是不同的,但是 dd.content1 与 d.content1 地址是相同的;因此即使在进行映射后更改 d 中的值,也会造成 dd 的改变。运用此方法进行 struct -> slice -> struct 耗时仅为 162ns,主要耗时集中在类型转换上。

接口设计

由于 boltdb 内部使用了读写锁,为了对读写事务加以区分,并给予用户较高的自由度,boltdb 设计了如下接口:

1
2
3
4
5
6
7
8
// 只读事务
func (db *DB) View(fn func(*Tx) error) error
// 写事务
func (db *DB) Update(fn func(*Tx) error) error
// 批处理事务
func (db *DB) Batch(fn func(*Tx) error) error
// 原始事务接口
func (db *DB) Begin(writable bool) (*Tx, error)

在事务的开启阶段,就可以简单地对事务进行区分,更好地处理读写并行关系。

空闲页索引

boltdb 的设计中,并不会直接在 B+树上进行修改,而是会先分配出一块缓冲区,写入缓冲区后再 merge 到 B+树上,即 COW。这种策略能够实现多读一些的并行。写事务的内存分配是由如下代码片段来实现的,该代码主要具有两个部分的功能,一部分是尝试用内存池中获取或者直接分配出一块缓冲区用于写入,另一部分则是在 B+树中找出合适的写入位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (db *DB) allocate(txid txid, count int) (*page, error) {
// 分配用于 COW 的内存
var buf []byte
if count == 1 {
buf = db.pagePool.Get().([]byte)
} else {
buf = make([]byte, count*db.pageSize)
}
p := (*page)(unsafe.Pointer(&buf[0]))
p.overflow = uint32(count - 1)

// 试图从当前 mmap 内存中找到可用的页
if p.id = db.freelist.allocate(txid, count); p.id != 0 {
return p, nil
}

// 如果未找到,说明数据库过大,需要调整 mmap 大小
p.id = db.rwtx.meta.pgid
var minsz = int((p.id+pgid(count))+1) * db.pageSize
if minsz >= db.datasz {
if err := db.mmap(minsz); err != nil {
return nil, fmt.Errorf("mmap allocate error: %s", err)
}
}

// 将之前的最后一页分配给事务(mmap 分配出的新页在该页之后)
db.rwtx.meta.pgid += pgid(count)

return p, nil
}

可以看到,freelist的功能只是为了调控当前事务需要再 B+树中写入的位置,而不直接管理内存。当事务需要写入时,必须分配一次内存,这样做的好处是不需要做内存池的管理了。当无法从freelist中获取可写的页时,代表当前 B+树已经无可用位置给事务写入,这并不代表当前 B+树已经写满了,可能是由于当前事务需要写入的数据量过大。具体如何去界定,是由不同的分配策略来决定的。

在 boltdb 中,使用freelist结构体来统一管理 mmap 获得的内存在当前时刻下的视图。该结构体只使用 pageid 来管理当前时刻的状态,但并不直接管理页面的内存,因为 boltdb 中使用的是 COW 机制,所有的写入操作并不会直接在 mmap 的内存上写入,而是先写入到其他内存上,等待写入完毕后再 merge 到 B+ 树中。这样是为了避免 mmap 的低效写入问题,并且更加具有安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type freelist struct {
freelistType FreelistType // freelist type
ids []pgid // all free and available free page ids.
allocs map[pgid]txid // mapping of txid that allocated a pgid.
pending map[txid]*txPending // mapping of soon-to-be free page ids by tx.
cache map[pgid]struct{} // fast lookup of all free and pending page ids.
freemaps map[uint64]pidSet // key is the size of continuous pages(span), value is a set which contains the starting pgids of same size
forwardMap map[pgid]uint64 // key is start pgid, value is its span size
backwardMap map[pgid]uint64 // key is end pgid, value is its span size
allocate func(txid txid, n int) pgid // the freelist allocate func
free_count func() int // the function which gives you free page number
mergeSpans func(ids pgids) // the mergeSpan func
getFreePageIDs func() []pgid // get free pgids func
readIDs func(pgids []pgid) // readIDs func reads list of pages and init the freelist
}

freelist以不同的视角来记录每一个页面上的可用范围,并根据写事务所需要的范围来分配具体的页面。在 boltdb 的实现中,freelist具有 FreelistArrayType 和 FreelistMapType 两种分配策略,前者会使用线性查找的方式来查询一块可用的内存页,而后者则使用索引的方式来查找可用的内存范围。FreelistMapType 分配策略的性能要远好于 FreelistArrayType。其具体做法是在归还的过程中将邻接页中可用的内存区域进行“合并”,并且这一块区域的整体大小注册到 freemaps 数据结构中。这样在查询可用区域的时候就不需要进行遍历,而是采用哈希表的方式进行查询。具体的细节可以参考该设计的博客文章

写入性能

归根结底,一个基于硬盘的数据库系统的性能瓶颈是硬盘的 IO 速度,在其他的数据库设计中,面对写入,都是先使用低成本的操作记录写入日志,然后再将写入内容搬运到具体的位置上,才能够获得较高的写入性能。而 boltdb 的设计中,并没有对写入操作进行任何的缓冲机制,而是直接将写入内容放入到硬盘中,这是其写入性能较低的根本原因。但是这种设计能够很简单地实现事务的串行,任何设计都有取舍,写入性能就是 boltdb 舍弃掉的东西。