或許在生活中大家都討厭定時器,比如周一早上的鬧鐘、承諾老板第二天一早給報告的 deadline;但是在代碼的世界里,定時器扮演著不可或缺的角色:定時任務、超時判斷、幀同步等等。
那定時器的本質是什么?我們使用的定時能力背后又暗藏著什么玄機,請繼續往下看。
注意:由于博客系統問題導致排版有點異常,接受不了的可以看 https://github.com/vorshen/blog/blob/master/timer/index.md
目錄
定時能力需要什么
信號
POSIX Timer
多路復用
定時能力需要什么
javascript 的定時器能力應該是使用最為方便,默認的上下文捕獲,函數式編程。
1 2 3 4 5 |
setTimeout(function() { console.log('利利喝多了'); }, 3000); console.log('利利噸噸噸'); |
我們可以把 setTimeout 的執行,拆解一下,主要是以下的流程。
主要有三個環節:
- 存放 callback
- 啟動一個倒計時
- 倒計時結束,取出存好的 callback,RUN!
BTW: JS 中定時器這么方便,不僅僅是 v8 的功勞,還需要執行環境 (eg: chrome、node) 給予支持。如果用 d8 去調試,會發現 setTimeout 并沒有定時執行。
核心需要解決 1,2 兩個問題,先看存放 callback,這里總結一下存放的特點:
- 上層設置定時任務的順序是不確定的,而最終的執行是有順序的,這里涉及到排序行為
- 設置定時器的動作可能是多次的
滿足由上條件,我們可以使用一個小根堆的數據結構來存放 callback。
BTW: 也有一種時間輪的方案,libco 中采取時間輪方案。
那么該如何啟動一個倒計時的鐘擺呢?從編程語言層面是沒有倒計時相關 api 的,還好操作系統內核給了我們一些解決方案。
BTW: 就好比說到 Linux 上定時任務,大家基本上都會想到 crontab,這也是內核給我們的能力的一種表現。
內核中具體的時鐘能力如何實現,不是我們的重點,這會涉及到 CPU 時鐘中斷,再底層還有硬件相關,感興趣的同學可以自行查閱。我們重點放在代碼中如何去使用操作系統提供的時鐘能力。
對于程序來說,我們的訴求就是設定了一個時間,當該時間到達(可以理解為超時),內核可以通知到應用程序。那么有哪些通知方式呢?
信號方案
那么我們先看信號的方案,一說到信號,可能就會想到 alarm(sleep 走開),這里舉個簡單的??。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
#include <unistd.h> #include <stdio.h> #include <signal.h> #include <stdlib.h> int curr = 0; int max = 3; void drink() { printf("利利噸噸噸\n"); curr++; if (curr < max) { alarm(1); // 1s 后發出 SIGALRM 信號 } else { kill(getpid(), SIGINT); } } void sigalrmCallback(int sig) { drink(); } void sigintCallback(int sig) { printf("利利喝不動了\n"); exit(0); } int main(int argc, char* argv[]) { signal(SIGALRM, sigalrmCallback); // 時鐘定時信號 signal(SIGINT, sigintCallback); // 終止信號 drink(); while (1) { pause(); // 等待信號 } return 0; } |
結果就不截圖了,代碼比較好理解,核心就是圍繞 SIGALRM 的監聽和觸發。
不過這里有一些問題,我們一一來看下
Q1: 精度問題,秒為精度,這太草了,肯定不能接受
A1: 不過我們可以用其他函數代替,比如 setitimer (精度為毫秒)
Q2: 無法多次調用 alrm
A2: 我們需要包裝一層,處理多次調用的情況。
不過上面兩個個問題還算好解決,針對以上兩個問題解法,這里有個改為 setitimer 優化版本,可見這里。
結果如下圖
可以看到精度提高了,并且支持了多次調用。
但是別高興的太早!問題還沒有結束!
Q3: 多線程情況下怎么辦?
A3: 信號在多線程下就是不靈活,一般做法需要用單獨的線程去監聽信號,其他線程屏蔽,寫起來很麻煩。
Q4: 信號可靠性?無論是 alrm 還是 setitimer 都是發送非實時信號。
A4: ???這太致命了,雖然是概率性的,但是總有在機場等艘船的感覺。
總結一下: 使用信號整體問題較多,雖然我們嘗試了一些解決方案,但是還是會存在無解的問題,所以這里也沒有真實使用信號的例子。
POSIX
針對剛剛的 Q1 到 Q4,根本性在于 alrm 和 setitmer 都不夠完善,為此 POSIX Timer 相關函數提供了解決方案。這一小節,我們主要看一下 POSIX Timer 相關函數,都是如何解決剛剛那些問題的。
-
精度問題
POSIX Timer 支持程度更高,支持到納秒級別。 -
無法多次調用
一個進程可以多次創建 Timer,相互獨立。1234567891011121314151617181920212223242526272829303132333435363738394041424344#include <unistd.h>#include <stdio.h>#include <signal.h>#include <stdlib.h>#include <pthread.h>decltype(SIGRTMIN) SIG_DRINK = SIGRTMIN + 1;void drink(u_int32_t second) {struct sigevent evp;timer_t timer;evp.sigev_notify = SIGEV_SIGNAL;evp.sigev_value.sival_ptr = &timer;evp.sigev_signo = SIG_DRINK; // 自定義信號int ret = timer_create(CLOCK_REALTIME, &evp, &timer); // 創建定時器if (ret) {printf("timer_create error\n");}struct itimerspec ts;ts.it_value.tv_sec = second / 1000;ts.it_value.tv_nsec = (second % 1000) * 1000;ts.it_interval.tv_sec = 0;ts.it_interval.tv_nsec = 0;ret = timer_settime(timer, CLOCK_REALTIME, &ts, nullptr); // 設置定時器if (ret) {printf("timer_settime error\n");}}int main(int argc, char* argv[]) {signal(SIG_DRINK, [](int sig) {printf("利利噸噸噸\n");});drink(3000);drink(2000);drink(1000);while (1) {pause();}return 0;}可以看到這里并不需要自己去處理多次調用,直接走創建定時器,設置定時器的流程就行。
-
多線程
POSIX Timer 提供了默認能力,當定時器結束的時候,可以啟動線程執行對應的函數。而且在 Linux 下,還擴展提供了往指定線程發送信號的能力。 -
信號可靠性
POSIX Timer 也可以指定信號,不過不再局限于非實時信號,可以選擇實時信號,???。
針對 POSIX Timer 的調用,下面畫了一張圖
具體函數使用、結構等可以看官方文檔,這里也給了一個簡易封裝的例子 posix 封裝為 setTimeout。
BTW: 其實本質上 POSIX Timer 也是信號方案,可以觀察進程信息中信號捕獲。SIGEV_THREAD 模式下,會啟動一個輔助線程,然后也是監聽到 SIGTIMER 信號,再做后續處理,源碼可見 https://code.woboq.org/userspace/glibc/sysdeps/unix/sysv/linux/timer_routines.c.html。
稍微總結一下,POSIX Timer 的方案,相比較之前已經完善了很多,不過還有一些缺點。
- 封裝處理較為麻煩
- 必須依賴 librt
使用該方案的開源項目有 gperftools,核心的代碼位置在 https://github.com/gperftools/gperftools/blob/master/src/profile-handler.cc#L282。
封裝方式和上文中的例子差不多,只是模式不一樣,這里就不詳細講解了。
多路復用
多路復用本身是為了解決服務器針對多連接時的阻塞問題,不過 select/poll/epoll 都提供了超時時間,而這一特性可以讓我們使用到定時器中。
以 boost 的 timer 為例,看如下代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
#include <iostream> #include <boost/asio.hpp> int main(int argc, char* argv[]) { std::cout << "利利喝!" << std::endl; boost::asio::io_service io; boost::asio::deadline_timer timer(io, boost::posix_time::seconds(3)); timer.wait(); std::cout << "3s后利利喝不動了!" << std::endl; return 0; } |
代碼很好理解,我們看一下 boost 是如何實現一個同步的 timer.wait 能力的,順著 deadline_timer_service 可以找到最后源碼位置在 https://github.com/boostorg/asio/blob/develop/include/boost/asio/detail/impl/socket_ops.ipp#L2162,簡單到無需多余講解。
BTW: 并且這里只用超時能力,不用擔心 select 本身在多路復用中的性能問題。
boost 中的異步定時器,也是采用了多路復用的方案,使用的是 epoll,其中用到了 timer_fd,先簡單的說一下 timer_fd。
timer_fd 是 linux2.6.25 后增加的 api,算是官方形式將定時能力和 IO 事件結合了起來。
異步定時器相比較同步復雜很多,所以我們通過分析 boost 中異步定時器的源碼來詳細展開下。
先畫個圖:
然后我們依次看一下。
-
將 timer_fd 綁定到 epoll_fd 上
epoll 使用一個文件描述符 (epoll_fd) 管理多個描述符 (例如這里的 timer_fd),這樣在用戶空間和內核空間的 copy 只需一次。
切記:這里 timer_fd 也需要進行復用,如果每次一個定時任務,都用一個新的 timer_fd,會有嚴重的性能浪費。123456789101112131415161718192021222324252627// epoll_reactor.ippepoll_reactor::epoll_reactor(boost::asio::execution_context& ctx): execution_context_service_base<epoll_reactor>(ctx),scheduler_(use_service<scheduler>(ctx)),mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(REACTOR_REGISTRATION, scheduler_.concurrency_hint())),interrupter_(),epoll_fd_(do_epoll_create()), // ?創建了 epoll_fd(epoll_create)timer_fd_(do_timerfd_create()), // ?創建了 timer_fdshutdown_(false),registered_descriptors_mutex_(mutex_.enabled()){// Add the interrupter's descriptor to epoll.epoll_event ev = { 0, { 0 } };ev.events = EPOLLIN | EPOLLERR | EPOLLET;ev.data.ptr = &interrupter_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);interrupter_.interrupt();// Add the timer descriptor to epoll.if (timer_fd_ != -1){ev.events = EPOLLIN | EPOLLERR;ev.data.ptr = &timer_fd_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev); // ?關聯 timer_fd 與 epoll_fd}}整體較好理解,幾個重要的點增加了注釋
-
添加任務到 timer_queue
1234567891011121314151617// timer_queue.hpp// Add a new timer to the queue. Returns true if this is the timer that is// earliest in the queue, in which case the reactor's event demultiplexing// function call may need to be interrupted and restarted.bool enqueue_timer(const time_type& time, per_timer_data& timer, wait_op* op){// Enqueue the timer object.// 略// Enqueue the individual timer operation.timer.op_queue_.push(op);// Interrupt reactor only if newly added timer is first to expire.return timer.heap_index_ == 0 && timer.op_queue_.front() == op;}// The heap of timers, with the earliest timer at the front.std::vector<heap_entry> heap_;enqueue_timer 里面大部分代碼我省略掉了,也就是在維護一個小根堆,讓最近的定時任務在前面,這樣可以方便第三步啟動和更新 timerfd。
BTW: 這里小根堆并不是像我們之前 demo 用了 priority_queue 方式,而是每次 push_back 會去 swap 修改 vector。 -
啟動/更新 timerfd
結合上一節的代碼,當 enqueue_timer 返回 true 的時候,就會去更新/啟動定時器。123456789101112131415161718192021// epoll_reactor.hppbool earliest = queue.enqueue_timer(time, timer, op);// ...if (earliest)update_timeout(); // ?更新定時器///////////////// epoll_reactor.ippvoid epoll_reactor::update_timeout(){#if defined(BOOST_ASIO_HAS_TIMERFD)if (timer_fd_ != -1){itimerspec new_timeout;itimerspec old_timeout;int flags = get_timeout(new_timeout);timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout); // ?設置 timerfdreturn;}#endif // defined(BOOST_ASIO_HAS_TIMERFD)interrupt();}注意:如果不支持 timerfd,則會直接調用 epoll_ctl。
-
啟動 epoll_wait
-
收到 IO 事件,從 timer_queue 中判斷過期任務
這兩步的代碼位置太過相近,就放一起來說了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
// epoll_reactor.hpp void epoll_reactor::run(long usec, op_queue<operation>& ops) { // This code relies on the fact that the scheduler queues the reactor task // behind all descriptor operations generated by this function. This means, // that by the time we reach this point, any previously returned descriptor // operations have already been dequeued. Therefore it is now safe for us to // reuse and return them for the scheduler to queue again. // Calculate timeout. Check the timer queues only if timerfd is not in use. int timeout; if (usec == 0) timeout = 0; else { timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1); if (timer_fd_ == -1) { mutex::scoped_lock lock(mutex_); timeout = get_timeout(timeout); } } // Block on the epoll descriptor. epoll_event events[128]; int num_events = epoll_wait(epoll_fd_, events, 128, timeout); // ?核心 // ... #if defined(BOOST_ASIO_HAS_TIMERFD) bool check_timers = (timer_fd_ == -1); #else // defined(BOOST_ASIO_HAS_TIMERFD) bool check_timers = true; #endif // defined(BOOST_ASIO_HAS_TIMERFD) // Dispatch the waiting events. for (int i = 0; i < num_events; ++i) { void* ptr = events[i].data.ptr; if (ptr == &interrupter_) { // No need to reset the interrupter since we're leaving the descriptor // in a ready-to-read state and relying on edge-triggered notifications // to make it so that we only get woken up when the descriptor's epoll // registration is updated. #if defined(BOOST_ASIO_HAS_TIMERFD) if (timer_fd_ == -1) check_timers = true; #else // defined(BOOST_ASIO_HAS_TIMERFD) check_timers = true; #endif // defined(BOOST_ASIO_HAS_TIMERFD) } #if defined(BOOST_ASIO_HAS_TIMERFD) else if (ptr == &timer_fd_) { check_timers = true; } #endif // defined(BOOST_ASIO_HAS_TIMERFD) else { // The descriptor operation doesn't count as work in and of itself, so we // don't call work_started() here. This still allows the scheduler to // stop if the only remaining operations are descriptor operations. descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr); if (!ops.is_enqueued(descriptor_data)) { descriptor_data->set_ready_events(events[i].events); ops.push(descriptor_data); } else { descriptor_data->add_ready_events(events[i].events); } } } if (check_timers) // ?check_timers 為 true 意味著需要檢測定時器隊列 { mutex::scoped_lock common_lock(mutex_); timer_queues_.get_ready_timers(ops); // ?獲取已經完成狀態的事件并執行 #if defined(BOOST_ASIO_HAS_TIMERFD) if (timer_fd_ != -1) { itimerspec new_timeout; itimerspec old_timeout; int flags = get_timeout(new_timeout); timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout); // ?繼續設置定時器 } #endif // defined(BOOST_ASIO_HAS_TIMERFD) } } |
重點的就是有?標志的代碼。
timer_queues 里面發現的過期事件會添加到 op_queue 里面去,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// timer_queue.hpp virtual void get_ready_timers(op_queue<operation>& ops) { if (!heap_.empty()) { const time_type now = Time_Traits::now(); while (!heap_.empty() && !Time_Traits::less_than(now, heap_[0].time_)) { per_timer_data* timer = heap_[0].timer_; ops.push(timer->op_queue_); remove_timer(*timer); } } } |
op_queue 會在 scheduler.ipp 內進行執行。
以上就是 boost 中的異步定時器執行分解,感興趣的同學也可以自己下源碼來學習。
BTW: libevent 中定時任務做法與 boost 基本一致,chromium 底層的 message_pump 也有使用 libevent。
總結
我們了解到需要實現一個定時器/定時任務,重點需要兩塊:
- 存放執行回調的地方
大部分選擇是小根堆的方案,簡單方便;也有時間輪的方案。 - 調用操作系統提供的定時能力
我們分析了「信號」「POSIX Timer」「多路復用」,信號 pass,后二者中更推薦多路復用一些。
分析了 boost asio 的源碼,學習了多路復用能力用在定時方面的解決辦法。
如果你還想了解的更多,可以學習 libevent、libco、chromium 中定時器方面采取的方案。
歡迎一起討論研究~