在linux平臺(tái)上使用c開發(fā)網(wǎng)絡(luò)程序的同志們一般情況下都對(duì)鼎鼎大名的libevent非常的熟悉了。但是一些新進(jìn)入此領(lǐng)域的new new people們對(duì)此都是一頭霧水。原本的迷茫再加上開源軟件一貫的“幫助文件”缺失作風(fēng),讓我們這些新手們顯的非常的無助。幸好有一些熱心的朋友們幫忙,才化險(xiǎn)為夷??!
前幾天一直在開發(fā)一個(gè)locker server,雖然公司現(xiàn)有的locker server能很好的運(yùn)轉(zhuǎn),但是畢竟是net的,通用性不廣,當(dāng)我們要在linux上開發(fā)多集群系統(tǒng)的時(shí)候現(xiàn)有的locker server就未免顯得有點(diǎn)捉襟見肘了。正是在開發(fā)locker server的過程中使用到了libevent。
總體上,libevent是很好用的。一兩個(gè)函數(shù)就能搞定你復(fù)雜的網(wǎng)絡(luò)通訊工作了。當(dāng)然了,這句話得用在你使用的是“單線程”的情況下。雖然在linux系統(tǒng)中,進(jìn)程的資源和window系統(tǒng)中進(jìn)程的資源相比輕量級(jí)很多,代價(jià)也相當(dāng)?shù)臎]有那么昂貴,所以很多的軟件都是使用“多進(jìn)程”方式實(shí)現(xiàn)的,比如大名鼎鼎的apache。但是在我們的系統(tǒng)中,我們使用了“單進(jìn)程多線程”的方式,這樣,我們就能在單機(jī)上啟動(dòng)多個(gè)進(jìn)程,以達(dá)到“偽分布式”的效果來達(dá)到測試的目的。
那么這個(gè)時(shí)候就要注意libevent的使用了,因?yàn)閷?duì)于event_base來說,不是線程安全的。也就是說多線程不能share同一個(gè)event_base,就算是加鎖操作也不行。那么這個(gè)時(shí)候就只能采取“單線程單event_base”的策略了。我的做法是做一個(gè)task pool(任務(wù)對(duì)象池),每個(gè)任務(wù)會(huì)被一個(gè)thread執(zhí)行,當(dāng)然了,thread肯定也是從thread pool拿出來的,而在task pool初始化的時(shí)候,我就給每個(gè)task中的event_base初始化了對(duì)象,這樣,萬事大吉了。
這個(gè)地方注意了以后,就開始說網(wǎng)絡(luò)通訊了。在使用libevent的時(shí)候,觸發(fā)事件是在接收到網(wǎng)絡(luò)連接(或者timeout事件超時(shí))的時(shí)候。所以你需要在事件處理函數(shù)中判斷時(shí)間源,其次libevent接收網(wǎng)絡(luò)通訊的字節(jié)流時(shí)是使用了libevnet中自帶的緩沖的,所以當(dāng)你接收的時(shí)候一定要注意累加,并且多次loop或者注冊 event_event中的事件。所以在我的task中,會(huì)有接收的data。當(dāng)然了如果你的協(xié)議是分為header和body的,通常header比較短,body比較長,而且在client,header和body通常是連續(xù)發(fā)送的,這樣,在使用libevent的時(shí)候,header和body是同時(shí)被接收到的,這點(diǎn)一定要注意,所以提醒你在接收數(shù)據(jù)的函數(shù)中,需要區(qū)分接收header部分還是body部分;當(dāng)body非常長,超過libevent的緩沖時(shí),是需要多次多次觸發(fā)接收函數(shù)的,這點(diǎn)也要注意,就是讓你需要在接收的時(shí)候除了區(qū)分header和body以外,還要注意一次接收不完全的情況下,對(duì)于數(shù)據(jù)需要累加。
當(dāng)你在使用libevent時(shí),event_set事件時(shí),只要不是使用EV_PERSIST注冊的事件是不需要在接收完一次數(shù)據(jù)后多次event_add的,只有當(dāng)你不使用EV_PERSIST時(shí),你的事件才需要多次event_add到event_base中;當(dāng)然了,使用了EV_PERSIST注冊的函數(shù)在event_base被task pool回收時(shí)是要顯式的event_del該注冊事件的,沒有使用EV_PERSIST注冊的事件是不需要顯式的使用event_del刪除該事件的。
1 static void read_buffer(int client_socket_fd,short event_type,void *arg) 2 { 3 if(NULL == arg) 4 { 5 log_error("File:"__FILE__",Line:%d.event base arg is NULL.",__LINE__); 6 return; 7 } 8 task_info_t *task_info = (task_info_t *) arg; 9 10 if(event_type == EV_TIMEOUT) 11 /* 12 這個(gè)地方注意需要判斷是否超時(shí) 13 因?yàn)槲襡vent_add事件的時(shí)候沒有使用ev_persist 14 所以當(dāng)超時(shí)時(shí)需要再add一次事件到event_base的loop中 15 */ 16 { 17 if(0 != event_add(&task_info->on_read,&task_info->timeout)) 18 { 19 log_error("File:"__FILE__",Line:%d.repeart add read header event to event_base is error."); 20 close(task_info->on_read.ev_fd); 21 task_pool_push(task_info); 22 23 } 24 return; 25 } 26 27 int bytes; 28 /* 29 這個(gè)地方就是開始接收頭部 30 接收頭部時(shí),可能分為好幾次從緩沖中取得,所以需要一個(gè)while累加 31 */ 32 while(header == task_info->read_type)//recv header 33 { 34 bytes = recv(client_socket_fd,task_info->header_buffer task_info->offset,REQUEST_LENGTH -task_info->offset,0); 35 if(0 > bytes ) 36 { 37 if (errno == EAGAIN || errno == EWOULDBLOCK) 38 { 39 if(0 != event_add(&task_info->on_read, &task_info->timeout)) 40 { 41 close(task_info->on_read.ev_fd); 42 task_pool_push(task_info); 43 44 log_error("File: "__FILE__", line: %d, "\ 45 "event_add fail.", __LINE__); 46 return; 47 } 48 } 49 else 50 { 51 log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s", 52 __LINE__, errno, strerror(errno)); 53 54 close(task_info->on_read.ev_fd); 55 task_pool_push(task_info); 56 } 57 return; 58 } 59 else if(0 == bytes) 60 { 61 log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.", 62 __LINE__); 63 close(task_info->on_read.ev_fd); 64 task_pool_push(task_info); 65 return; 66 } 67 68 if(REQUEST_LENGTH > bytes task_info->offset) 69 { 70 log_warning("File:"__FILE__",Line:%d.recv header is not over.",__LINE__); 71 task_info->offset = bytes; 72 if(0 != event_add(&task_info->on_read, &task_info->timeout)) 73 { 74 close(task_info->on_read.ev_fd); 75 task_pool_push(task_info); 76 log_error("File: "__FILE__", line: %d, "\ 77 "event_add fail.", __LINE__); 78 return; 79 } 80 } 81 else 82 { 83 task_info->read_type = body; 84 deal_request_header(task_info); 85 task_info->body_buffer = (char *) malloc(task_info->request_info.length); 86 if(NULL == task_info->body_buffer) 87 { 88 log_error("File:"__FILE__",Line:%d.alloc mem to task_info data is error.",__LINE__); 89 close(client_socket_fd); 90 task_pool_push(task_info); 91 return; 92 } 93 memset(task_info->body_buffer,0,task_info->request_info.length); 94 task_info->offset = 0;//set recv body buffer offset to 0 95 break; 96 } 97 } 98 99 /* 100 這個(gè)地方就是開始接收body, 101 和header一樣,也要考慮body多次接收累加的情況。 102 */ 103 while(body == task_info->read_type) 104 { 105 bytes = recv(client_socket_fd,task_info->body_buffer task_info->offset,task_info->request_info.length-task_info->offset,0); 106 if(0 > bytes ) 107 { 108 if (errno == EAGAIN || errno == EWOULDBLOCK) 109 { 110 if(0 != event_add(&task_info->on_read, &task_info->timeout)) 111 { 112 close(task_info->on_read.ev_fd); 113 task_pool_push(task_info); 114 115 log_error("File: "__FILE__", line: %d, "\ 116 "event_add fail.", __LINE__); 117 return; 118 } 119 } 120 else 121 { 122 log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s", 123 __LINE__, errno, strerror(errno)); 124 125 close(task_info->on_read.ev_fd); 126 task_pool_push(task_info); 127 } 128 return; 129 } 130 else if(0 == bytes) 131 { 132 log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.", 133 __LINE__); 134 close(task_info->on_read.ev_fd); 135 task_pool_push(task_info); 136 return; 137 } 138 139 if(task_info->request_info.length-task_info->offset > bytes) 140 { 141 log_warning("File:"__FILE__",Line:%d.recv body is not over.",__LINE__); 142 task_info->offset = bytes; 143 if(0 != event_add(&task_info->on_read, &task_info->timeout)) 144 { 145 close(task_info->on_read.ev_fd); 146 task_pool_push(task_info); 147 log_error("File: "__FILE__", line: %d, "\ 148 "event_add fail.", __LINE__); 149 return; 150 } 151 } 152 else 153 { 154 task_info->read_type = unspecified; 155 break; 156 } 157 } 158 deal_request_body(client_socket_fd,task_info); 159 return; 160 } 161 162 163 void deal_working_thread(void *arg) 164 { 165 log_info("debug to this."); 166 int client_socket_fd = (int) arg; 167 if(0 > client_socket_fd) 168 { 169 log_error("File:"__FILE__",Line:%d.the arg means client socket filedesc is less 0!",__LINE__); 170 return; 171 } 172 /* 173 設(shè)置網(wǎng)絡(luò)為非阻塞,libevent必須的 174 */ 175 if(!set_nonblocking(client_socket_fd)) 176 { 177 log_error("File:"__FILE__",Line:%d.set client socket filedesc is error.error info is %s!", 178 __LINE__,strerror(errno)); 179 close(client_socket_fd); 180 return; 181 } 182 183 task_info_t *task_info; 184 task_info = task_pool_pop(); 185 /* 186 對(duì)event_base注冊事件回調(diào)函數(shù), 187 注意沒有使用EV_PERSIST 188 */ 189 do 190 { 191 task_info->read_type = header; 192 event_set(&task_info->on_read,client_socket_fd,EV_READ,read_buffer,(void *) task_info); 193 if(0 != event_base_set(task_info->event_base,&task_info->on_read)) 194 { 195 log_error("File:"__FILE__",Line:%d.Associate the read header event to event_base is error.",__LINE__); 196 task_info->read_type = unspecified; 197 close(client_socket_fd); 198 task_pool_push(task_info); 199 break; 200 } 201 202 event_set(&task_info->on_write,client_socket_fd,EV_WRITE,response_handle,(void *) task_info); 203 if(0 != event_base_set(task_info->event_base,&task_info->on_write)) 204 { 205 log_error("File:"__FILE__",Line:%d.Associate the write hander to event_base is error.",__LINE__); 206 task_info->read_type = unspecified; 207 close(client_socket_fd); 208 task_pool_push(task_info); 209 break; 210 } 211 212 213 if(0 != event_add(&task_info->on_read,&task_info->timeout)) 214 { 215 log_error("File:"__FILE__",Line:%d.add the read header event to event_base is error.",__LINE__); 216 task_info->read_type = unspecified; 217 close(client_socket_fd); 218 task_pool_push(task_info); 219 break; 220 } 221 222 event_base_loop(task_info->event_base,EVLOOP_NONBLOCK); 223 }while(false); 224 return; 225 }
|