ceph中的cls调用流程

Last updated on 8 months ago

这次介绍下cls 的调用流程(类似于RPC)

以gc 流程为例; 删除 对象后,会在gc obj 上记录已经删除的信息,以 omap 形式存在,gc list 则可以看到 gc上的omap,这个功能实现就是通过用 cls 实现的, 所以 我们gdb 进去看看,调用流程

1
gdb -args ./bin/radosgw-admin gc list  --include-all

结合源码,我知道 是在 RGWGC::list 中实现的,所以 断点打在这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(gdb) b RGWGC::list
Breakpoint 1 at 0x1257842: file /home/ceph/src/rgw/rgw_gc.cc, line 96.

(gdb) r
Starting program: /home/ceph/build/bin/radosgw-admin gc list --include-all
(gdb) bt
#0 cls_rgw_gc_list (io_ctx=..., oid="gc.0", marker="", max=1000, expired_only=false,
entries=empty std::__cxx11::list, truncated=0x7fffffff96d8, next_marker="") at /home/ceph/src/cls/rgw/cls_rgw_client.cc:974

#1 0x00005555567ab936 in RGWGC::list (this=0x5555583cc4c0, index=0x7fffffff9ca0, marker="", max=1000, expired_only=false, result=empty std::__cxx11::list, truncated=0x7fffffff96d8)
at /home/ceph/src/rgw/rgw_gc.cc:102

#2 0x000055555657cec8 in RGWRados::list_gc_objs (this=0x5555584d8000, index=0x7fffffff9ca0, marker="", max=1000,
expired_only=false, result=empty std::__cxx11::list, truncated=0x7fffffff96d8)
at /home/ceph/src/rgw/rgw_rados.cc:11378

#3 0x0000555555f6b13f in main (argc=4, argv=0x7fffffffda58) at /home/ceph/src/rgw/rgw_admin.cc:8048
(gdb)

cls_rgw_gc_list
此时操作可以类比为 客户端 开始构造请求,每个cls 都有自己的数据结构, 构造的时候会 encode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int cls_rgw_gc_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker)
{
bufferlist in, out;

cls_rgw_gc_list_op call;
call.marker = marker;
call.max = max;
call.expired_only = expired_only;
//把 参数打包
encode(call, in);
//操作那个 obj id, 那个cls 模块, 那个函数, 输入,输出
int r = io_ctx.exec(oid, RGW_CLASS, RGW_GC_LIST, in, out);
if (r < 0)
return r;

// out 为结果
cls_rgw_gc_list_ret ret;
...
return r;
}

io_ctx.exec 看看这个怎么调用的

1
2
3
4
5
6
7
int librados::IoCtx::exec(const std::string& oid, const char *cls, const char *method,
bufferlist& inbl, bufferlist& outbl)
{
object_t obj(oid);
return io_ctx_impl->exec(obj, cls, method, inbl, outbl);
}

因为是读操作,所以这也是 operate_read

1
2
3
4
5
6
7
8
9
10
11
int librados::IoCtxImpl::exec(const object_t& oid, const char *cls, const char *method,
bufferlist& inbl, bufferlist& outbl)
{
::ObjectOperation rd;
prepare_assert_ops(&rd);
//构造op 类型
rd.call(cls, method, inbl);
//准备发送
return operate_read(oid, &rd, &outbl);
}

rd.call(cls, method, inbl);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

void call(const char *cname, const char *method, bufferlist &indata) {
// 这里记下,是 CEPH_OSD_OP_CALL 类型的
add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
}

void add_call(int op, const char *cname, const char *method,
bufferlist &indata,
bufferlist *outbl, Context *ctx, int *prval) {

OSDOp& osd_op = add_op(op);
unsigned p = ops.size() - 1;
out_handler[p] = ctx;
out_bl[p] = outbl;
out_rval[p] = prval;
// 那个cls 模块
osd_op.op.cls.class_len = strlen(cname);
// 那个 cls 函数
osd_op.op.cls.method_len = strlen(method);
osd_op.op.cls.indata_len = indata.length();
osd_op.indata.append(cname, osd_op.op.cls.class_len);
osd_op.indata.append(method, osd_op.op.cls.method_len);
osd_op.indata.append(indata);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int librados::IoCtxImpl::operate_read(const object_t& oid,::ObjectOperation *o,bufferlist *pbl, int flags)
{
if (!o->size())
return 0;

Mutex mylock("IoCtxImpl::operate_read::mylock");
Cond cond;
bool done;
int r;
version_t ver;

Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);

int op = o->ops[0].op.op;
ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
*o, snap_seq, pbl, flags,
onack, &ver);
// 把op 发给 随机选取的osd TODO: 怎么选取osd 后面文章会讲解
objecter->op_submit(objecter_op);
// Objecter::_op_submit_with_budget
// Objecter::_op_submit
// Objecter::_send_op_account
return r;
}

看看osd 这边怎么处理的,gbd进入osd , 对于收到的op ,都会经过 PrimaryLogPG::do_osd_ops 做处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int PrimaryLogPG::do_osd_ops(){

switch (op.op) {
case CEPH_OSD_OP_CALL:
{
string cname, mname;
// 获取cls 参数
bufferlist indata;
try {
bp.copy(op.cls.class_len, cname);
bp.copy(op.cls.method_len, mname);
bp.copy(op.cls.indata_len, indata);
} catch (buffer::error& e) {
.....
break;
}

ClassHandler::ClassData *cls;
result = osd->class_handler->open_class(cname, &cls);
ceph_assert(result == 0); // init_op_flags() already verified this works.

ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());


bufferlist outdata;
dout(10) << "call method " << cname << "." << mname << dendl;
int prev_rd = ctx->num_read;
int prev_wr = ctx->num_write;
//执行cls 函数
result = method->exec((cls_method_context_t)&ctx, indata, outdata);

op.extent.length = outdata.length();
osd_op.outdata.claim_append(outdata);
dout(30) << "out dump: ";
osd_op.outdata.hexdump(*_dout);
*_dout << dendl;
}
}
}

总结:

cls 调用流程也是伴随着io流程走的,上层作为客户端,把cls 参数编码到 bufferlist(ceph 中很通用的数据结构)作为 op 发给 osd,osd在根据 call method 在内存找到对应的函数指针; 回头看的话,欸,这不就是类似一个rpc 调用吗….