当前位置: 首页 > news >正文

【ceph学习】ceph如何进行数据的读写(2)

本章摘要

上文说到,librados/IoctxImpl.cc中调用objecter_op和objecter的op_submit函数,进行op请求的封装、加参和提交。
本文详细介绍相关函数的调用。

osdc中的操作

初始化Op对象,提交请求

设置Op对象的时间,oid,操作类型等信息。

//osdc/Objector.h// mid-level helpersOp *prepare_mutate_op(const object_t& oid, const object_locator_t& oloc,ObjectOperation& op, const SnapContext& snapc,ceph::real_time mtime, int flags,Context *oncommit, version_t *objver = NULL,osd_reqid_t reqid = osd_reqid_t(),ZTracer::Trace *parent_trace = nullptr) {Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |CEPH_OSD_FLAG_WRITE, oncommit, objver,nullptr, parent_trace);o->priority = op.priority;o->mtime = mtime;o->snapc = snapc;o->out_rval.swap(op.out_rval);o->out_bl.swap(op.out_bl);o->out_handler.swap(op.out_handler);o->out_ec.swap(op.out_ec);o->reqid = reqid;op.clear();return o;}
osdc/Objector.cc
// read | write ---------------------------
void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
{shunique_lock rl(rwlock, ceph::acquire_shared);ceph_tid_t tid = 0;if (!ptid)ptid = &tid;op->trace.event("op submit");_op_submit_with_budget(op, rl, ptid, ctx_budget);
}
//
void Objecter::_op_submit_with_budget(Op *op,shunique_lock<ceph::shared_mutex>& sul,ceph_tid_t *ptid,int *ctx_budget)
{// throttle.  before we look at any state, because// _take_op_budget() may drop our lock while it blocks.if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {int op_budget = _take_op_budget(op, sul);// take and pass out the budget for the first OP// in the context sessionif (ctx_budget && (*ctx_budget == -1)) {*ctx_budget = op_budget;}}if (osd_timeout > timespan(0)) {if (op->tid == 0)op->tid = ++last_tid;auto tid = op->tid;op->ontimeout = timer.add_event(osd_timeout,[this, tid]() {op_cancel(tid, -ETIMEDOUT); });}_op_submit(op, sul, ptid);
}
计算需要提交到哪个osd上,并建立连接。便于后续发送请求至对应的osd上
void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
{// rwlock is lockedldout(cct, 10) << __func__ << " op " << op << dendl;// pick targetceph_assert(op->session == NULL);OSDSession *s = NULL;bool check_for_latest_map = false;//计算需要提交到哪个osd上。int r = _calc_target(&op->target, nullptr);switch(r) {case RECALC_OP_TARGET_POOL_DNE:check_for_latest_map = true;break;case RECALC_OP_TARGET_POOL_EIO:if (op->has_completion()) {op->complete(osdc_errc::pool_eio, -EIO);}return;}// Try to get a session, including a retry if we need to take write lock//根据osd号,建立session连接r = _get_session(op->target.osd, &s, sul);_send_op_account(op);// send?ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));bool need_send = false;if (op->target.paused) {ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"<< dendl;_maybe_request_map();} else if (!s->is_homeless()) {need_send = true;} else {_maybe_request_map();}unique_lock sl(s->lock);if (op->tid == 0)op->tid = ++last_tid;_session_op_assign(s, op);//发送请求if (need_send) {_send_op(op);}// Last chance to touch Op here, after giving up session lock it can// be freed at any time by response handler.ceph_tid_t tid = op->tid;if (check_for_latest_map) {_send_op_map_check(op);}if (ptid)*ptid = tid;op = NULL;sl.unlock();put_session(s);ldout(cct, 5) << num_in_flight << " in flight" << dendl;
}

根据osd的序号,即osd id。建立session信息

/*** Look up OSDSession by OSD id.** @returns 0 on success, or -EAGAIN if the lock context requires* promotion to write.*/
int Objecter::_get_session(int osd, OSDSession **session,shunique_lock<ceph::shared_mutex>& sul)
{auto s = new OSDSession(cct, osd);osd_sessions[osd] = s;s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));s->con->set_priv(RefCountedPtr{s});logger->inc(l_osdc_osd_session_open);logger->set(l_osdc_osd_sessions, osd_sessions.size());s->get();*session = s;ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "<< s->get_nref() << dendl;return 0;
}
构建osdop的message,发送请求
void Objecter::_send_op(Op *op)
{// rwlock is locked// op->session->lock is locked// backoff?auto p = op->session->backoffs.find(op->target.actual_pgid);if (p != op->session->backoffs.end()) {hobject_t hoid = op->target.get_hobj();auto q = p->second.lower_bound(hoid);if (q != p->second.begin()) {--q;if (hoid >= q->second.end) {++q;}}if (q != p->second.end()) {ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin<< "," << q->second.end << ")" << dendl;int r = cmp(hoid, q->second.begin);if (r == 0 || (r > 0 && hoid < q->second.end)) {ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid<< " id " << q->second.id << " on " << hoid<< ", queuing " << op << " tid " << op->tid << dendl;return;}}}ceph_assert(op->tid > 0);//构建MOSDOp对象,Message OSD OP。MOSDOp *m = _prepare_osd_op(op);if (op->target.actual_pgid != m->get_spg()) {ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "<< m->get_spg() << " to " << op->target.actual_pgid<< ", updating and reencoding" << dendl;m->set_spg(op->target.actual_pgid);m->clear_payload();  // reencode}ldout(cct, 15) << "_send_op " << op->tid << " to "<< op->target.actual_pgid << " on osd." << op->session->osd<< dendl;ConnectionRef con = op->session->con;ceph_assert(con);op->incarnation = op->session->incarnation;if (op->trace.valid()) {m->trace.init("op msg", nullptr, &op->trace);}op->session->con->send_message(m);
}

构建MOSDOp,加参数。设置必要参数

Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
{// rwlock is lockedint flags = op->target.flags;flags |= CEPH_OSD_FLAG_KNOWN_REDIR;flags |= CEPH_OSD_FLAG_SUPPORTSPOOLEIO;// Nothing checks this any longer, but needed for compatibility with// pre-luminous osdsflags |= CEPH_OSD_FLAG_ONDISK;if (!honor_pool_full)flags |= CEPH_OSD_FLAG_FULL_FORCE;op->target.paused = false;op->stamp = ceph::coarse_mono_clock::now();hobject_t hobj = op->target.get_hobj();auto m = new MOSDOp(client_inc, op->tid,hobj, op->target.actual_pgid,osdmap->get_epoch(),flags, op->features);m->set_snapid(op->snapid);m->set_snap_seq(op->snapc.seq);m->set_snaps(op->snapc.snaps);m->ops = op->ops;m->set_mtime(op->mtime);m->set_retry_attempt(op->attempts++);if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {op->trace.init("op", &trace_endpoint);}if (op->priority)m->set_priority(op->priority);elsem->set_priority(cct->_conf->osd_client_op_priority);if (op->reqid != osd_reqid_t()) {m->set_reqid(op->reqid);}logger->inc(l_osdc_op_send);ssize_t sum = 0;for (unsigned i = 0; i < m->ops.size(); i++) {sum += m->ops[i].indata.length();}logger->inc(l_osdc_op_send_bytes, sum);return m;
}

发送请求

int AsyncConnection::send_message(Message *m)
{FUNCTRACE(async_msgr->cct);lgeneric_subdout(async_msgr->cct, ms,1) << "-- " << async_msgr->get_myaddrs() << " --> "<< get_peer_addrs() << " -- "<< *m << " -- " << m << " con "<< this<< dendl;if (is_blackhole()) {lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)<< " blackhole " << *m << dendl;m->put();return 0;}// optimistic think it's ok to encode(actually may broken now)if (!m->get_priority())m->set_priority(async_msgr->get_default_send_priority());m->get_header().src = async_msgr->get_myname();m->set_connection(this);#if defined(WITH_EVENTTRACE)if (m->get_type() == CEPH_MSG_OSD_OP)OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
#endifif (is_loopback) { //loopback connectionldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;std::lock_guard<std::mutex> l(write_lock);if (protocol->is_connected()) {dispatch_queue->local_delivery(m, m->get_priority());} else {ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."<< " Drop message " << m << dendl;m->put();}return 0;}// we don't want to consider local message here, it's too lightweight which// may disturb userslogger->inc(l_msgr_send_messages);protocol->send_message(m);return 0;
}

在初始化中,实际选择的是ProtocolV2,该类继承自Protocol。有V1和V2两种。

void ProtocolV2::send_message(Message *m) {uint64_t f = connection->get_features();// TODO: Currently not all messages supports reencode like MOSDMap, so here// only let fast dispatch support messages prepare messageconst bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);if (can_fast_prepare) {prepare_send_message(f, m);}std::lock_guard<std::mutex> l(connection->write_lock);bool is_prepared = can_fast_prepare;// "features" changes will change the payload encodingif (can_fast_prepare && (!can_write || connection->get_features() != f)) {// ensure the correctness of message encodingm->clear_payload();is_prepared = false;ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f<< " != " << connection->get_features() << dendl;}if (state == CLOSED) {ldout(cct, 10) << __func__ << " connection closed."<< " Drop message " << m << dendl;m->put();} else {ldout(cct, 5) << __func__ << " enqueueing message m=" << m<< " type=" << m->get_type() << " " << *m << dendl;m->queue_start = ceph::mono_clock::now();m->trace.event("async enqueueing message");out_queue[m->get_priority()].emplace_back(out_queue_entry_t{is_prepared, m});ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m<< dendl;if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {write_in_progress = true;connection->center->dispatch_event_external(connection->write_handler);}}
}
请求发送线程处理
void EventCenter::dispatch_event_external(EventCallbackRef e)
{uint64_t num = 0;{std::lock_guard lock{external_lock};if (external_num_events > 0 && *external_events.rbegin() == e) {return;}external_events.push_back(e);num = ++external_num_events;}if (num == 1 && !in_thread())wakeup();ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;
}
std::function<void ()> NetworkStack::add_thread(Worker* w)
{return [this, w]() {rename_thread(w->id);const unsigned EventMaxWaitUs = 30000000;w->center.set_owner();ldout(cct, 10) << __func__ << " starting" << dendl;w->initialize();w->init_done();while (!w->done) {ldout(cct, 30) << __func__ << " calling event process" << dendl;ceph::timespan dur;int r = w->center.process_events(EventMaxWaitUs, &dur);if (r < 0) {ldout(cct, 20) << __func__ << " process events failed: "<< cpp_strerror(errno) << dendl;// TODO do something?}w->perf_logger->tinc(l_msgr_running_total_time, dur);}w->reset();w->destroy();};
}
int EventCenter::process_events(unsigned timeout_microseconds,  ceph::timespan *working_dur)
{struct timeval tv;int numevents;bool trigger_time = false;auto now = clock_type::now();clock_type::time_point end_time = now + std::chrono::microseconds(timeout_microseconds);auto it = time_events.begin();if (it != time_events.end() && end_time >= it->first) {trigger_time = true;end_time = it->first;if (end_time > now) {timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end_time - now).count();} else {timeout_microseconds = 0;}}bool blocking = pollers.empty() && !external_num_events.load();if (!blocking)timeout_microseconds = 0;tv.tv_sec = timeout_microseconds / 1000000;tv.tv_usec = timeout_microseconds % 1000000;ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;std::vector<FiredFileEvent> fired_events;numevents = driver->event_wait(fired_events, &tv);auto working_start = ceph::mono_clock::now();for (int event_id = 0; event_id < numevents; event_id++) {int rfired = 0;FileEvent *event;EventCallbackRef cb;event = _get_file_event(fired_events[event_id].fd);/* note the event->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */if (event->mask & fired_events[event_id].mask & EVENT_READABLE) {rfired = 1;cb = event->read_cb;cb->do_request(fired_events[event_id].fd);}if (event->mask & fired_events[event_id].mask & EVENT_WRITABLE) {if (!rfired || event->read_cb != event->write_cb) {cb = event->write_cb;cb->do_request(fired_events[event_id].fd);}}ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[event_id].fd<< " mask is " << fired_events[event_id].mask << dendl;}if (trigger_time)numevents += process_time_events();if (external_num_events.load()) {external_lock.lock();std::deque<EventCallbackRef> cur_process;cur_process.swap(external_events);external_num_events.store(0);external_lock.unlock();numevents += cur_process.size();while (!cur_process.empty()) {EventCallbackRef e = cur_process.front();ldout(cct, 30) << __func__ << " do " << e << dendl;e->do_request(0);cur_process.pop_front();}}if (!numevents && !blocking) {for (uint32_t i = 0; i < pollers.size(); i++)numevents += pollers[i]->poll();}if (working_dur)*working_dur = ceph::mono_clock::now() - working_start;return numevents;
}

http://www.mrgr.cn/news/13729.html

相关文章:

  • 恶意软件分析与防御:网络安全技术的新篇章
  • Element-plus组件库基础组件使用
  • 解决Spring的代理失效问题
  • 零基础学习Redis(8) -- list类型命令使用
  • ubuntu jammy vagrant 国内源
  • [设计模式之抽象工厂模式—— 家具工厂]
  • 基于深度学习的游客满意度分析与评论分析【情感分析、主题分析】
  • Java设计模式之建造者模式详细讲解和案例示范
  • HTML沙漏爱心
  • SQL进阶技巧:如何查询最近一笔有效订单? | 近距离有效匹配问题
  • 云计算概述
  • Spring MVC常用注解及用法
  • 无人机挂载迫击炮吊舱设计技术详解
  • Selenium(HTML基础)
  • Vue 0_1项目实战
  • React -TS学习—— useRef
  • Swift concurrency 4 — Task和.task的理解与使用
  • Go 服务调试精解
  • ZooKeeper体系架构、安装、HA
  • [MySql]保姆级上手教程