libphenom 源码笔记

看了下Facebook前段时间放出来的libphenom,简单总结一下。

【转注】:libPhenom是Facebook发布的一个C语言事件框架,用于构建高性能和高可扩展的系统。支持多线程、提供内存管理和常用数据结构、json处理。

网络相关

libphenom提供了自定义的socket描述符ph_socket_t和通用的地址结构phenom_sockaddr。 ph_sock结构封装了读写buffer、用于NBIO的job结构、超时时长、事件发生后的callback等信息。 被enable的ph_sock将由NBIO pool管理。

监听和连接都是封装成job来异步化:

  • ph_socket_connect(s, addr, timeout, func, arg)
    • s上发起对addr的异步连接,超时时间timeout,结束后(可能连接失败)调用func(arg)。这个callback的类型为ph_socket_connect_func,若出错则status为对应的errno
  • ph_sock_resolve_and_connect(),解析域名并发起连接。
    • 根据resolver参数指定的resolver解析域名。如果是PH_SOCK_CONNECT_RESOLVE_SYSTEM,调用ph_dns_getaddrinfo(),将解析相关的数据、callback封装为job后通过ph_job_set_pool()加入DNS线程池。如果是PH_SOCK_CONNECT_RESOLVE_ARES则调用ARES库解析。
  • 一个listener的job被调度时,调用其callback accept_dispatch()。这个函数通过accept4()accept()来接收新连接,并对客户端socket调用listener上的acceptor()

以通过getaddrinfo()解析域名为例,

Job

Job的定义ph_job_def包含callback、memtype和destructor。 新创建的job会根据其定义中的memtype来分配内存,并设置callback。 每种具体的job,例如ph_listener_t,都有对应的ph_job_def

创建一个job时,调用ph_job_alloc(),传入job对应的定义来获取和初始化动态分配的对象。

线程池

Phenom线程上记录了

  • pending NBIO job 队列
  • pending pool job 队列
  • pthread_t 线程id
  • 在pool中的结点
  • name

每个phenom线程分配一个全局唯一的id,对应一个pthread线程。 如注释所说,tid < MAX_RINGS的phenom线程称为preferred thread, 拥有自己专用的job队列,其他线程竞争共享队列,用spinlock同步。

全局的pools将所有线程池保存在链表中。其中包含用于consumer和producer等待/唤醒的结构(futex或condition variable), 保存job的ring buffer、worker线程的指针等等信息。

ph_thread_spawn(func, arg)创建一个ph_thread_t线程。 实际上是调用pthread_create(),让其执行ph_thread_boot(),将实际要执行的函数func() 和参数arg等信息传入。ph_thread_boot()会分配内存并创建一个新的ph_thread_t结构, 执行一些初始化,然后调用传入的那个func()

此外,封装了join、self、setaffinity等等pthread操作。

NBIO

用timer wheel管理timer,设置wheel interval 100ms,初始100ms。

全局的timer_job,监视timerfd上的读事件,触发tick_epoll(),使timer wheel往前跳一个tick,处理当前的定时任务。

ph_nbio_init()初始化NBIO。

  • calloc()分配num_schedulers个emitter
  • 初始化每个emitter及其timer wheel(ph_timerwheel_init()ph_nbio_emitter_init()
    • timer_fd描述符(timerfd_create()
    • timer_job扔进pending_nbio队列,这个job被调度到时执行tick_epoll
  • 初始化相关的counter

ph_sched_run()启动job调度。

  • emitters中的phenom线程emitters[1 .. num_threads]执行sched_loop()emitters[0]ph_sched_run()的调用者
    • 设置is_worker
    • 设置CPU affinity
    • 设置线程名
    • epoll_emitter()
  • 启动worker。对pools中每个线程池,其中每个线程pool->threads[0 .. pool->max_workers]执行worker_thread()
    • worker线程设置自己的is_worker标记
    • 根据自己的tid选出对应的buffer,设置CPU affinity
    • 进入循环,不断地弹出job(pop_job()),执行job的callback。弹出job的时候,首先根据tid尝试自己的队列,否则从其它线程的队列里取,还取不出则wait_pool()
    • 检查pending_poolpending_nbio是否还有deferred job
      • pending_pool中的job根据当前线程的tid加入线程池对应的buffer,并唤醒等待的consumer(wake_pool()
      • pending_nbio中的job加入到epoll中(_ph_job_set_pool_immediate()process_deferred()
    • 若有等待的producer,唤醒之
  • 启动gc_job
  • sched_loop()

string

代码里面有很长的一段注释描述了大致的设计和使用方法。其中提到了三个设计目标:

字符串分为stack-based和embedded两种

  • stack-based
    • 固定大小的buffer
      • PH_STRING_DECLARE_STACK(name, size)
      • memtype为PH_STRING_STATIC
    • 可增长的buffer
      • PH_STRING_DECLARE_GROW(name, size, memtype)
      • memtype为PH_STRING_GROW_MT(mt),即-mt。这里用负的memtype表示stack-based growable,正的memtype表示heap-allocated growable
  • embedded
    • 在结构中要配合另一个buffer成员来使用

栈上固定大小的buffer不能动态地扩容,只能用到定义时指定的大小空间。 栈上静态分配的可增长buffer在空间不够用时会根据指定的memtype进行动态分配,并将memtype改为正数。

通过引用计数来跟踪动态分配的buffer。引用计数到0的时候(ph_string_delref())会根据memtype释放掉 动态分配的堆内存。

内存管理

支持用户自定义的、命名的对象类型,其中指定了这一类型需要分配的大小和一些标记, 例如返回前清零(PH_MEM_FLAGS_ZERO)。全局的memtypes管理了已注册的所有memtype, 预分配1024个memtype指针。

注册后(ph_memtype_register())得到一个类型为ph_memtype_t的“描述符”, 以后引用这个类型的时候使用的都是这个“描述符”。

分配和释放的各个接口:

  • 分配固定大小:ph_mem_alloc(mt),根据mt中记录的类型大小来分配
  • 分配不定大小:ph_mem_alloc_size(mt, size),分配size + HEADER_RESERVATION。这个header中记录了本次分配的大小和memtype
  • 释放:ph_mem_free(mt, ptr),根据memtype或header来释放

ph_mem_stats类型和相应的接口来维护、查询某个类型的内存分配信息。

 

收藏 评论

相关文章

可能感兴趣的话题



直接登录
跳到底部
返回顶部