小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

libevent多線程使用事項(xiàng)

 yishan886 2013-07-27

在linux平臺(tái)上使用c開發(fā)網(wǎng)絡(luò)程序的同志們一般情況下都對(duì)鼎鼎大名的libevent非常的熟悉了。但是一些新進(jìn)入此領(lǐng)域的new new people們對(duì)此都是一頭霧水。原本的迷茫再加上開源軟件一貫的“幫助文件”缺失作風(fēng),讓我們這些新手們顯的非常的無助。幸好有一些熱心的朋友們幫忙,才化險(xiǎn)為夷??!

    前幾天一直在開發(fā)一個(gè)locker server,雖然公司現(xiàn)有的locker server能很好的運(yùn)轉(zhuǎn),但是畢竟是net的,通用性不廣,當(dāng)我們要在linux上開發(fā)多集群系統(tǒng)的時(shí)候現(xiàn)有的locker server就未免顯得有點(diǎn)捉襟見肘了。正是在開發(fā)locker server的過程中使用到了libevent。

    總體上,libevent是很好用的。一兩個(gè)函數(shù)就能搞定你復(fù)雜的網(wǎng)絡(luò)通訊工作了。當(dāng)然了,這句話得用在你使用的是“單線程”的情況下。雖然在linux系統(tǒng)中,進(jìn)程的資源和window系統(tǒng)中進(jìn)程的資源相比輕量級(jí)很多,代價(jià)也相當(dāng)?shù)臎]有那么昂貴,所以很多的軟件都是使用“多進(jìn)程”方式實(shí)現(xiàn)的,比如大名鼎鼎的apache。但是在我們的系統(tǒng)中,我們使用了“單進(jìn)程多線程”的方式,這樣,我們就能在單機(jī)上啟動(dòng)多個(gè)進(jìn)程,以達(dá)到“偽分布式”的效果來達(dá)到測試的目的。

      那么這個(gè)時(shí)候就要注意libevent的使用了,因?yàn)閷?duì)于event_base來說,不是線程安全的。也就是說多線程不能share同一個(gè)event_base,就算是加鎖操作也不行。那么這個(gè)時(shí)候就只能采取“單線程單event_base”的策略了。我的做法是做一個(gè)task pool(任務(wù)對(duì)象池),每個(gè)任務(wù)會(huì)被一個(gè)thread執(zhí)行,當(dāng)然了,thread肯定也是從thread pool拿出來的,而在task pool初始化的時(shí)候,我就給每個(gè)task中的event_base初始化了對(duì)象,這樣,萬事大吉了。

      這個(gè)地方注意了以后,就開始說網(wǎng)絡(luò)通訊了。在使用libevent的時(shí)候,觸發(fā)事件是在接收到網(wǎng)絡(luò)連接(或者timeout事件超時(shí))的時(shí)候。所以你需要在事件處理函數(shù)中判斷時(shí)間源,其次libevent接收網(wǎng)絡(luò)通訊的字節(jié)流時(shí)是使用了libevnet中自帶的緩沖的,所以當(dāng)你接收的時(shí)候一定要注意累加,并且多次loop或者注冊 event_event中的事件。所以在我的task中,會(huì)有接收的data。當(dāng)然了如果你的協(xié)議是分為header和body的,通常header比較短,body比較長,而且在client,header和body通常是連續(xù)發(fā)送的,這樣,在使用libevent的時(shí)候,header和body是同時(shí)被接收到的,這點(diǎn)一定要注意,所以提醒你在接收數(shù)據(jù)的函數(shù)中,需要區(qū)分接收header部分還是body部分;當(dāng)body非常長,超過libevent的緩沖時(shí),是需要多次多次觸發(fā)接收函數(shù)的,這點(diǎn)也要注意,就是讓你需要在接收的時(shí)候除了區(qū)分header和body以外,還要注意一次接收不完全的情況下,對(duì)于數(shù)據(jù)需要累加。

      當(dāng)你在使用libevent時(shí),event_set事件時(shí),只要不是使用EV_PERSIST注冊的事件是不需要在接收完一次數(shù)據(jù)后多次event_add的,只有當(dāng)你不使用EV_PERSIST時(shí),你的事件才需要多次event_add到event_base中;當(dāng)然了,使用了EV_PERSIST注冊的函數(shù)在event_base被task pool回收時(shí)是要顯式的event_del該注冊事件的,沒有使用EV_PERSIST注冊的事件是不需要顯式的使用event_del刪除該事件的。

復(fù)制代碼
  1 static void  read_buffer(int client_socket_fd,short event_type,void *arg)
2 {
3 if(NULL == arg)
4 {
5 log_error("File:"__FILE__",Line:%d.event base arg is NULL.",__LINE__);
6 return;
7 }
8 task_info_t *task_info = (task_info_t *) arg;
9
10 if(event_type == EV_TIMEOUT)
11 /*
12 這個(gè)地方注意需要判斷是否超時(shí)
13 因?yàn)槲襡vent_add事件的時(shí)候沒有使用ev_persist
14 所以當(dāng)超時(shí)時(shí)需要再add一次事件到event_base的loop中
15 */
16 {
17 if(0 != event_add(&task_info->on_read,&task_info->timeout))
18 {
19 log_error("File:"__FILE__",Line:%d.repeart add read header event to event_base is error.");
20 close(task_info->on_read.ev_fd);
21 task_pool_push(task_info);
22
23 }
24 return;
25 }
26
27 int bytes;
28 /*
29 這個(gè)地方就是開始接收頭部
30 接收頭部時(shí),可能分為好幾次從緩沖中取得,所以需要一個(gè)while累加
31 */
32 while(header == task_info->read_type)//recv header
33 {
34 bytes = recv(client_socket_fd,task_info->header_buffer task_info->offset,REQUEST_LENGTH -task_info->offset,0);
35 if(0 > bytes )
36 {
37 if (errno == EAGAIN || errno == EWOULDBLOCK)
38 {
39 if(0 != event_add(&task_info->on_read, &task_info->timeout))
40 {
41 close(task_info->on_read.ev_fd);
42 task_pool_push(task_info);
43
44 log_error("File: "__FILE__", line: %d, "\
45 "event_add fail.", __LINE__);
46 return;
47 }
48 }
49 else
50 {
51 log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s",
52 __LINE__, errno, strerror(errno));
53
54 close(task_info->on_read.ev_fd);
55 task_pool_push(task_info);
56 }
57 return;
58 }
59 else if(0 == bytes)
60 {
61 log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.",
62 __LINE__);
63 close(task_info->on_read.ev_fd);
64 task_pool_push(task_info);
65 return;
66 }
67
68 if(REQUEST_LENGTH > bytes task_info->offset)
69 {
70 log_warning("File:"__FILE__",Line:%d.recv header is not over.",__LINE__);
71 task_info->offset = bytes;
72 if(0 != event_add(&task_info->on_read, &task_info->timeout))
73 {
74 close(task_info->on_read.ev_fd);
75 task_pool_push(task_info);
76 log_error("File: "__FILE__", line: %d, "\
77 "event_add fail.", __LINE__);
78 return;
79 }
80 }
81 else
82 {
83 task_info->read_type = body;
84 deal_request_header(task_info);
85 task_info->body_buffer = (char *) malloc(task_info->request_info.length);
86 if(NULL == task_info->body_buffer)
87 {
88 log_error("File:"__FILE__",Line:%d.alloc mem to task_info data is error.",__LINE__);
89 close(client_socket_fd);
90 task_pool_push(task_info);
91 return;
92 }
93 memset(task_info->body_buffer,0,task_info->request_info.length);
94 task_info->offset = 0;//set recv body buffer offset to 0
95 break;
96 }
97 }
98
99 /*
100 這個(gè)地方就是開始接收body,
101 和header一樣,也要考慮body多次接收累加的情況。
102 */
103 while(body == task_info->read_type)
104 {
105 bytes = recv(client_socket_fd,task_info->body_buffer task_info->offset,task_info->request_info.length-task_info->offset,0);
106 if(0 > bytes )
107 {
108 if (errno == EAGAIN || errno == EWOULDBLOCK)
109 {
110 if(0 != event_add(&task_info->on_read, &task_info->timeout))
111 {
112 close(task_info->on_read.ev_fd);
113 task_pool_push(task_info);
114
115 log_error("File: "__FILE__", line: %d, "\
116 "event_add fail.", __LINE__);
117 return;
118 }
119 }
120 else
121 {
122 log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s",
123 __LINE__, errno, strerror(errno));
124
125 close(task_info->on_read.ev_fd);
126 task_pool_push(task_info);
127 }
128 return;
129 }
130 else if(0 == bytes)
131 {
132 log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.",
133 __LINE__);
134 close(task_info->on_read.ev_fd);
135 task_pool_push(task_info);
136 return;
137 }
138
139 if(task_info->request_info.length-task_info->offset > bytes)
140 {
141 log_warning("File:"__FILE__",Line:%d.recv body is not over.",__LINE__);
142 task_info->offset = bytes;
143 if(0 != event_add(&task_info->on_read, &task_info->timeout))
144 {
145 close(task_info->on_read.ev_fd);
146 task_pool_push(task_info);
147 log_error("File: "__FILE__", line: %d, "\
148 "event_add fail.", __LINE__);
149 return;
150 }
151 }
152 else
153 {
154 task_info->read_type = unspecified;
155 break;
156 }
157 }
158 deal_request_body(client_socket_fd,task_info);
159 return;
160 }
161
162
163 void deal_working_thread(void *arg)
164 {
165 log_info("debug to this.");
166 int client_socket_fd = (int) arg;
167 if(0 > client_socket_fd)
168 {
169 log_error("File:"__FILE__",Line:%d.the arg means client socket filedesc is less 0!",__LINE__);
170 return;
171 }
172 /*
173 設(shè)置網(wǎng)絡(luò)為非阻塞,libevent必須的
174 */
175 if(!set_nonblocking(client_socket_fd))
176 {
177 log_error("File:"__FILE__",Line:%d.set client socket filedesc is error.error info is %s!",
178 __LINE__,strerror(errno));
179 close(client_socket_fd);
180 return;
181 }
182
183 task_info_t *task_info;
184 task_info = task_pool_pop();
185 /*
186 對(duì)event_base注冊事件回調(diào)函數(shù),
187 注意沒有使用EV_PERSIST
188 */
189 do
190 {
191 task_info->read_type = header;
192 event_set(&task_info->on_read,client_socket_fd,EV_READ,read_buffer,(void *) task_info);
193 if(0 != event_base_set(task_info->event_base,&task_info->on_read))
194 {
195 log_error("File:"__FILE__",Line:%d.Associate the read header event to event_base is error.",__LINE__);
196 task_info->read_type = unspecified;
197 close(client_socket_fd);
198 task_pool_push(task_info);
199 break;
200 }
201
202 event_set(&task_info->on_write,client_socket_fd,EV_WRITE,response_handle,(void *) task_info);
203 if(0 != event_base_set(task_info->event_base,&task_info->on_write))
204 {
205 log_error("File:"__FILE__",Line:%d.Associate the write hander to event_base is error.",__LINE__);
206 task_info->read_type = unspecified;
207 close(client_socket_fd);
208 task_pool_push(task_info);
209 break;
210 }
211
212
213 if(0 != event_add(&task_info->on_read,&task_info->timeout))
214 {
215 log_error("File:"__FILE__",Line:%d.add the read header event to event_base is error.",__LINE__);
216 task_info->read_type = unspecified;
217 close(client_socket_fd);
218 task_pool_push(task_info);
219 break;
220 }
221
222 event_base_loop(task_info->event_base,EVLOOP_NONBLOCK);
223 }while(false);
224 return;
225 }
復(fù)制代碼

      

 

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多