Ceph Async Messenger

This post discusses the high-level architecture of Ceph network layer focusing on Async Messenger code flow. Ceph currently has three messenger implementations – Simple, Async, xio. Async Messenger by far is the most efficient messenger over the others. It can handle different transport types like posix, rdma, dpdk. It uses a limited thread pool for connections (based on number of replicas or EC chunks) and polling system to achieve high-concurrency. It supports epoll, kqueue and select based on system’s environment and availability.

Async.jpg

Async Messenger comprises mainly of three components:

  1. Network Stack – initializes stack based on Posix, RDMA, DPDK options
  2. Processor – handles socket layer communication
  3. AsyncConnection – maintains connection state machine

To understand the code flow, let’s look at the client-server model i.e. from the perspective of how a connection is established between Rados client and OSD. Starting with the OSD, three types of messengers are created – public, cluster and heartbeat:

//ceph_osd.cc
int main() {
 ...
 Messenger *ms_public = Messenger::create(g_ceph_context, public_msg_type,..)
 Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msg_type,..)
 Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msg_type,..)
 ...
}

Network Stack

On creation of an AsyncMessenger instance, it initiates the network stack. Based on the messenger type parameter, Posix (/rdma/dpdk) transport type stack is created. Heap memory is allocated here for Processors (more on this below) based on number of workers.

//Stack.cc
NetworkStack::NetworkStack(CephContext *c, const string &t)
: type(t), started(false), cct(c) {
  ...
  for (unsigned i = 0; i center.process_events(EventMaxWaitUs, &dur);
  ...
}

Number of Worker threads created is dependent on ms_async_op_threads config value. EventCenter class which is an abstraction to handle various event drivers such as epoll, kqueue, select is initiated for all the workers. Each worker thread then creates an epoll (/kqueue/select) instance (line#7) and starts waiting in order to process epoll events inside each thread (line#20).

Once this is all set up, OSD needs to bind the port and, listen in on the socket for incoming connections.

//ceph_osd.cc
int main(){
  ....
  if (ms_public->bindv(public_addrs) bindv(cluster_addrs) start();
  ms_cluster->start();
  ms_objecter->start();
  // start osd
  err = osd->init();
}

Processor & AsyncConnection

Socket processing operations such as bind, listen, accept are handled by Processor class . This class is initiated in AsyncMessenger’s constructor. line#4 and line#7 call into Processor’s bind method. After the address and port are bound, it starts ‘listening’ on the socket and waits for incoming connections.

//AsyncMessenger.cc
int Processor::bind(const entity_addrvec_t &bind_addrs,...){
  ...
 if (listen_addr.get_port()) {
    worker->center.submit_to(worker->center.get_id(),
                           [this, k, &listen_addr, &opts, &r]() {
        r = worker->listen(listen_addr, opts, &listen_sockets[k]);
      }, false);
  }
}
//PosixStack.cc
int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
                        ServerSocket *sock){
  ...
  r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
  ...
  r = ::listen(listen_sd, cct->conf->ms_tcp_listen_backlog);
  ...
}

After bind and listen, OSD is started in osd->init()  call (in ceph_osd.cc). This will call  into the Processor’s start method which adds the listen file descriptor to epoll with a callback to Processor::accept. Now every time there’s  a new connection, ‘EventEpoll’ is ready to process.

Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
   :  msgr(r), net(c), worker(w),
     listen_handler(new C_processor_accept(this)) {}

void Processor::start()
{
  // start thread
  worker->center.submit_to(worker->center.get_id(), [this]() {
       for (auto& l : listen_sockets) {
         if (l) {
          worker->center.create_file_event(l.fd(), EVENT_READABLE,
                                           listen_handler); }
         }
     }, false);
}

Network connections between client and servers are maintained via AsyncConnection state machine.  On connection acceptance in Processor, it calls into AsyncConnection::accept which assigns an initial state of START_ACCEPTING. Further,  AsyncConnection read_handler which is dispatched as an external event, begins processing the connection:

//AsyncConnection.cc
void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr){
  ...
  state = STATE_ACCEPTING;
  center->dispatch_event_external(read_handler);
  ...
}

class C_handle_read : public EventCallback {
  AsyncConnectionRef conn;

  public:
  explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
  void do_request(uint64_t fd_or_id) override {
  conn->process();
  }
};

void AsyncConnection::process() {
  ...
}

On the client side, RadosClient only initiates a messenger. An actual connection with an OSD is only established when an operation is submitted to OSD in Objecter module.

//RadosClient.cc
int librados::RadosClient::connect(){
  ...
  messenger->add_dispatcher_tail(this);
  messenger->start();
  ...
}

//Objecter.cc
void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid){
  ...
  // Try to get a session, including a retry if we need to take write lock
  int r = _get_session(op->target.osd, &s, sul);
  ...
}

int Objecter::_get_session(...){
  ...
  s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
  ...
}

This completes the connection establishment between Rados client and an OSD server. We did not delve into the finer details like policy parameters, heartbeat clusters, EventCenter code and more – it’s worth looking into those and piecing all this information together for broader understanding. Perhaps in another post!

Advertisement