Commit | Line | Data |
---|---|---|
d7e09d03 PT |
1 | /* |
2 | * GPL HEADER START | |
3 | * | |
4 | * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. | |
5 | * | |
6 | * This program is free software; you can redistribute it and/or modify | |
7 | * it under the terms of the GNU General Public License version 2 only, | |
8 | * as published by the Free Software Foundation. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, but | |
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 | * General Public License version 2 for more details (a copy is included | |
14 | * in the LICENSE file that accompanied this code). | |
15 | * | |
16 | * You should have received a copy of the GNU General Public License | |
17 | * version 2 along with this program; If not, see | |
18 | * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf | |
19 | * | |
20 | * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, | |
21 | * CA 95054 USA or visit www.sun.com if you need additional information or | |
22 | * have any questions. | |
23 | * | |
24 | * GPL HEADER END | |
25 | */ | |
26 | /* | |
27 | * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. | |
28 | * Use is subject to license terms. | |
29 | * | |
30 | * Copyright (c) 2011, 2012, Intel Corporation. | |
31 | */ | |
32 | /* | |
33 | * This file is part of Lustre, http://www.lustre.org/ | |
34 | * Lustre is a trademark of Sun Microsystems, Inc. | |
35 | * | |
36 | * lustre/ptlrpc/ptlrpcd.c | |
37 | */ | |
38 | ||
39 | /** \defgroup ptlrpcd PortalRPC daemon | |
40 | * | |
41 | * ptlrpcd is a special thread with its own set where other user might add | |
42 | * requests when they don't want to wait for their completion. | |
43 | * PtlRPCD will take care of sending such requests and then processing their | |
44 | * replies and calling completion callbacks as necessary. | |
45 | * The callbacks are called directly from ptlrpcd context. | |
46 | * It is important to never significantly block (esp. on RPCs!) within such | |
47 | * completion handler or a deadlock might occur where ptlrpcd enters some | |
48 | * callback that attempts to send another RPC and wait for it to return, | |
49 | * during which time ptlrpcd is completely blocked, so e.g. if import | |
50 | * fails, recovery cannot progress because connection requests are also | |
51 | * sent by ptlrpcd. | |
52 | * | |
53 | * @{ | |
54 | */ | |
55 | ||
56 | #define DEBUG_SUBSYSTEM S_RPC | |
57 | ||
58 | # include <linux/libcfs/libcfs.h> | |
59 | ||
60 | #include <lustre_net.h> | |
61 | # include <lustre_lib.h> | |
62 | ||
63 | #include <lustre_ha.h> | |
64 | #include <obd_class.h> /* for obd_zombie */ | |
65 | #include <obd_support.h> /* for OBD_FAIL_CHECK */ | |
66 | #include <cl_object.h> /* cl_env_{get,put}() */ | |
67 | #include <lprocfs_status.h> | |
68 | ||
69 | #include "ptlrpc_internal.h" | |
70 | ||
71 | struct ptlrpcd { | |
72 | int pd_size; | |
73 | int pd_index; | |
74 | int pd_nthreads; | |
75 | struct ptlrpcd_ctl pd_thread_rcv; | |
76 | struct ptlrpcd_ctl pd_threads[0]; | |
77 | }; | |
78 | ||
79 | static int max_ptlrpcds; | |
80 | CFS_MODULE_PARM(max_ptlrpcds, "i", int, 0644, | |
81 | "Max ptlrpcd thread count to be started."); | |
82 | ||
83 | static int ptlrpcd_bind_policy = PDB_POLICY_PAIR; | |
84 | CFS_MODULE_PARM(ptlrpcd_bind_policy, "i", int, 0644, | |
85 | "Ptlrpcd threads binding mode."); | |
86 | static struct ptlrpcd *ptlrpcds; | |
87 | ||
88 | struct mutex ptlrpcd_mutex; | |
89 | static int ptlrpcd_users = 0; | |
90 | ||
91 | void ptlrpcd_wake(struct ptlrpc_request *req) | |
92 | { | |
93 | struct ptlrpc_request_set *rq_set = req->rq_set; | |
94 | ||
95 | LASSERT(rq_set != NULL); | |
96 | ||
97 | wake_up(&rq_set->set_waitq); | |
98 | } | |
99 | EXPORT_SYMBOL(ptlrpcd_wake); | |
100 | ||
101 | static struct ptlrpcd_ctl * | |
102 | ptlrpcd_select_pc(struct ptlrpc_request *req, pdl_policy_t policy, int index) | |
103 | { | |
104 | int idx = 0; | |
105 | ||
106 | if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL) | |
107 | return &ptlrpcds->pd_thread_rcv; | |
108 | ||
109 | switch (policy) { | |
110 | case PDL_POLICY_SAME: | |
111 | idx = smp_processor_id() % ptlrpcds->pd_nthreads; | |
112 | break; | |
113 | case PDL_POLICY_LOCAL: | |
114 | /* Before CPU partition patches available, process it the same | |
115 | * as "PDL_POLICY_ROUND". */ | |
116 | # ifdef CFS_CPU_MODE_NUMA | |
117 | # warning "fix this code to use new CPU partition APIs" | |
118 | # endif | |
119 | /* Fall through to PDL_POLICY_ROUND until the CPU | |
120 | * CPU partition patches are available. */ | |
121 | index = -1; | |
122 | case PDL_POLICY_PREFERRED: | |
123 | if (index >= 0 && index < num_online_cpus()) { | |
124 | idx = index % ptlrpcds->pd_nthreads; | |
125 | break; | |
126 | } | |
127 | /* Fall through to PDL_POLICY_ROUND for bad index. */ | |
128 | default: | |
129 | /* Fall through to PDL_POLICY_ROUND for unknown policy. */ | |
130 | case PDL_POLICY_ROUND: | |
131 | /* We do not care whether it is strict load balance. */ | |
132 | idx = ptlrpcds->pd_index + 1; | |
133 | if (idx == smp_processor_id()) | |
134 | idx++; | |
135 | idx %= ptlrpcds->pd_nthreads; | |
136 | ptlrpcds->pd_index = idx; | |
137 | break; | |
138 | } | |
139 | ||
140 | return &ptlrpcds->pd_threads[idx]; | |
141 | } | |
142 | ||
143 | /** | |
144 | * Move all request from an existing request set to the ptlrpcd queue. | |
145 | * All requests from the set must be in phase RQ_PHASE_NEW. | |
146 | */ | |
147 | void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) | |
148 | { | |
149 | struct list_head *tmp, *pos; | |
150 | struct ptlrpcd_ctl *pc; | |
151 | struct ptlrpc_request_set *new; | |
152 | int count, i; | |
153 | ||
154 | pc = ptlrpcd_select_pc(NULL, PDL_POLICY_LOCAL, -1); | |
155 | new = pc->pc_set; | |
156 | ||
157 | list_for_each_safe(pos, tmp, &set->set_requests) { | |
158 | struct ptlrpc_request *req = | |
159 | list_entry(pos, struct ptlrpc_request, | |
160 | rq_set_chain); | |
161 | ||
162 | LASSERT(req->rq_phase == RQ_PHASE_NEW); | |
163 | req->rq_set = new; | |
164 | req->rq_queued_time = cfs_time_current(); | |
165 | } | |
166 | ||
167 | spin_lock(&new->set_new_req_lock); | |
168 | list_splice_init(&set->set_requests, &new->set_new_requests); | |
169 | i = atomic_read(&set->set_remaining); | |
170 | count = atomic_add_return(i, &new->set_new_count); | |
171 | atomic_set(&set->set_remaining, 0); | |
172 | spin_unlock(&new->set_new_req_lock); | |
173 | if (count == i) { | |
174 | wake_up(&new->set_waitq); | |
175 | ||
176 | /* XXX: It maybe unnecessary to wakeup all the partners. But to | |
177 | * guarantee the async RPC can be processed ASAP, we have | |
178 | * no other better choice. It maybe fixed in future. */ | |
179 | for (i = 0; i < pc->pc_npartners; i++) | |
180 | wake_up(&pc->pc_partners[i]->pc_set->set_waitq); | |
181 | } | |
182 | } | |
183 | EXPORT_SYMBOL(ptlrpcd_add_rqset); | |
184 | ||
185 | /** | |
186 | * Return transferred RPCs count. | |
187 | */ | |
188 | static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des, | |
189 | struct ptlrpc_request_set *src) | |
190 | { | |
191 | struct list_head *tmp, *pos; | |
192 | struct ptlrpc_request *req; | |
193 | int rc = 0; | |
194 | ||
195 | spin_lock(&src->set_new_req_lock); | |
196 | if (likely(!list_empty(&src->set_new_requests))) { | |
197 | list_for_each_safe(pos, tmp, &src->set_new_requests) { | |
198 | req = list_entry(pos, struct ptlrpc_request, | |
199 | rq_set_chain); | |
200 | req->rq_set = des; | |
201 | } | |
202 | list_splice_init(&src->set_new_requests, | |
203 | &des->set_requests); | |
204 | rc = atomic_read(&src->set_new_count); | |
205 | atomic_add(rc, &des->set_remaining); | |
206 | atomic_set(&src->set_new_count, 0); | |
207 | } | |
208 | spin_unlock(&src->set_new_req_lock); | |
209 | return rc; | |
210 | } | |
211 | ||
212 | /** | |
213 | * Requests that are added to the ptlrpcd queue are sent via | |
214 | * ptlrpcd_check->ptlrpc_check_set(). | |
215 | */ | |
216 | void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx) | |
217 | { | |
218 | struct ptlrpcd_ctl *pc; | |
219 | ||
220 | if (req->rq_reqmsg) | |
221 | lustre_msg_set_jobid(req->rq_reqmsg, NULL); | |
222 | ||
223 | spin_lock(&req->rq_lock); | |
224 | if (req->rq_invalid_rqset) { | |
225 | struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5), | |
226 | back_to_sleep, NULL); | |
227 | ||
228 | req->rq_invalid_rqset = 0; | |
229 | spin_unlock(&req->rq_lock); | |
230 | l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi); | |
231 | } else if (req->rq_set) { | |
232 | /* If we have a vaid "rq_set", just reuse it to avoid double | |
233 | * linked. */ | |
234 | LASSERT(req->rq_phase == RQ_PHASE_NEW); | |
235 | LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY); | |
236 | ||
237 | /* ptlrpc_check_set will decrease the count */ | |
238 | atomic_inc(&req->rq_set->set_remaining); | |
239 | spin_unlock(&req->rq_lock); | |
240 | wake_up(&req->rq_set->set_waitq); | |
241 | return; | |
242 | } else { | |
243 | spin_unlock(&req->rq_lock); | |
244 | } | |
245 | ||
246 | pc = ptlrpcd_select_pc(req, policy, idx); | |
247 | ||
248 | DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]", | |
249 | req, pc->pc_name, pc->pc_index); | |
250 | ||
251 | ptlrpc_set_add_new_req(pc, req); | |
252 | } | |
253 | EXPORT_SYMBOL(ptlrpcd_add_req); | |
254 | ||
255 | static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set) | |
256 | { | |
257 | atomic_inc(&set->set_refcount); | |
258 | } | |
259 | ||
260 | /** | |
261 | * Check if there is more work to do on ptlrpcd set. | |
262 | * Returns 1 if yes. | |
263 | */ | |
264 | static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) | |
265 | { | |
266 | struct list_head *tmp, *pos; | |
267 | struct ptlrpc_request *req; | |
268 | struct ptlrpc_request_set *set = pc->pc_set; | |
269 | int rc = 0; | |
270 | int rc2; | |
d7e09d03 PT |
271 | |
272 | if (atomic_read(&set->set_new_count)) { | |
273 | spin_lock(&set->set_new_req_lock); | |
274 | if (likely(!list_empty(&set->set_new_requests))) { | |
275 | list_splice_init(&set->set_new_requests, | |
276 | &set->set_requests); | |
277 | atomic_add(atomic_read(&set->set_new_count), | |
278 | &set->set_remaining); | |
279 | atomic_set(&set->set_new_count, 0); | |
280 | /* | |
281 | * Need to calculate its timeout. | |
282 | */ | |
283 | rc = 1; | |
284 | } | |
285 | spin_unlock(&set->set_new_req_lock); | |
286 | } | |
287 | ||
288 | /* We should call lu_env_refill() before handling new requests to make | |
289 | * sure that env key the requests depending on really exists. | |
290 | */ | |
291 | rc2 = lu_env_refill(env); | |
292 | if (rc2 != 0) { | |
293 | /* | |
294 | * XXX This is very awkward situation, because | |
295 | * execution can neither continue (request | |
296 | * interpreters assume that env is set up), nor repeat | |
297 | * the loop (as this potentially results in a tight | |
298 | * loop of -ENOMEM's). | |
299 | * | |
300 | * Fortunately, refill only ever does something when | |
301 | * new modules are loaded, i.e., early during boot up. | |
302 | */ | |
303 | CERROR("Failure to refill session: %d\n", rc2); | |
0a3bdb00 | 304 | return rc; |
d7e09d03 PT |
305 | } |
306 | ||
307 | if (atomic_read(&set->set_remaining)) | |
308 | rc |= ptlrpc_check_set(env, set); | |
309 | ||
310 | if (!list_empty(&set->set_requests)) { | |
311 | /* | |
312 | * XXX: our set never completes, so we prune the completed | |
313 | * reqs after each iteration. boy could this be smarter. | |
314 | */ | |
315 | list_for_each_safe(pos, tmp, &set->set_requests) { | |
316 | req = list_entry(pos, struct ptlrpc_request, | |
317 | rq_set_chain); | |
318 | if (req->rq_phase != RQ_PHASE_COMPLETE) | |
319 | continue; | |
320 | ||
321 | list_del_init(&req->rq_set_chain); | |
322 | req->rq_set = NULL; | |
323 | ptlrpc_req_finished(req); | |
324 | } | |
325 | } | |
326 | ||
327 | if (rc == 0) { | |
328 | /* | |
329 | * If new requests have been added, make sure to wake up. | |
330 | */ | |
331 | rc = atomic_read(&set->set_new_count); | |
332 | ||
333 | /* If we have nothing to do, check whether we can take some | |
334 | * work from our partner threads. */ | |
335 | if (rc == 0 && pc->pc_npartners > 0) { | |
336 | struct ptlrpcd_ctl *partner; | |
337 | struct ptlrpc_request_set *ps; | |
338 | int first = pc->pc_cursor; | |
339 | ||
340 | do { | |
341 | partner = pc->pc_partners[pc->pc_cursor++]; | |
342 | if (pc->pc_cursor >= pc->pc_npartners) | |
343 | pc->pc_cursor = 0; | |
344 | if (partner == NULL) | |
345 | continue; | |
346 | ||
347 | spin_lock(&partner->pc_lock); | |
348 | ps = partner->pc_set; | |
349 | if (ps == NULL) { | |
350 | spin_unlock(&partner->pc_lock); | |
351 | continue; | |
352 | } | |
353 | ||
354 | ptlrpc_reqset_get(ps); | |
355 | spin_unlock(&partner->pc_lock); | |
356 | ||
357 | if (atomic_read(&ps->set_new_count)) { | |
358 | rc = ptlrpcd_steal_rqset(set, ps); | |
359 | if (rc > 0) | |
360 | CDEBUG(D_RPCTRACE, "transfer %d" | |
361 | " async RPCs [%d->%d]\n", | |
362 | rc, partner->pc_index, | |
363 | pc->pc_index); | |
364 | } | |
365 | ptlrpc_reqset_put(ps); | |
366 | } while (rc == 0 && pc->pc_cursor != first); | |
367 | } | |
368 | } | |
369 | ||
0a3bdb00 | 370 | return rc; |
d7e09d03 PT |
371 | } |
372 | ||
373 | /** | |
374 | * Main ptlrpcd thread. | |
375 | * ptlrpc's code paths like to execute in process context, so we have this | |
376 | * thread which spins on a set which contains the rpcs and sends them. | |
377 | * | |
378 | */ | |
379 | static int ptlrpcd(void *arg) | |
380 | { | |
381 | struct ptlrpcd_ctl *pc = arg; | |
382 | struct ptlrpc_request_set *set = pc->pc_set; | |
383 | struct lu_env env = { .le_ses = NULL }; | |
384 | int rc, exit = 0; | |
d7e09d03 PT |
385 | |
386 | unshare_fs_struct(); | |
387 | #if defined(CONFIG_SMP) | |
388 | if (test_bit(LIOD_BIND, &pc->pc_flags)) { | |
389 | int index = pc->pc_index; | |
390 | ||
391 | if (index >= 0 && index < num_possible_cpus()) { | |
392 | while (!cpu_online(index)) { | |
393 | if (++index >= num_possible_cpus()) | |
394 | index = 0; | |
395 | } | |
32654b67 PT |
396 | set_cpus_allowed_ptr(current, |
397 | cpumask_of_node(cpu_to_node(index))); | |
d7e09d03 PT |
398 | } |
399 | } | |
400 | #endif | |
401 | /* | |
402 | * XXX So far only "client" ptlrpcd uses an environment. In | |
403 | * the future, ptlrpcd thread (or a thread-set) has to given | |
404 | * an argument, describing its "scope". | |
405 | */ | |
406 | rc = lu_context_init(&env.le_ctx, | |
407 | LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF); | |
408 | complete(&pc->pc_starting); | |
409 | ||
410 | if (rc != 0) | |
0a3bdb00 | 411 | return rc; |
d7e09d03 PT |
412 | |
413 | /* | |
414 | * This mainloop strongly resembles ptlrpc_set_wait() except that our | |
415 | * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when | |
416 | * there are requests in the set. New requests come in on the set's | |
417 | * new_req_list and ptlrpcd_check() moves them into the set. | |
418 | */ | |
419 | do { | |
420 | struct l_wait_info lwi; | |
421 | int timeout; | |
422 | ||
423 | timeout = ptlrpc_set_next_timeout(set); | |
424 | lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), | |
425 | ptlrpc_expired_set, set); | |
426 | ||
427 | lu_context_enter(&env.le_ctx); | |
428 | l_wait_event(set->set_waitq, | |
429 | ptlrpcd_check(&env, pc), &lwi); | |
430 | lu_context_exit(&env.le_ctx); | |
431 | ||
432 | /* | |
433 | * Abort inflight rpcs for forced stop case. | |
434 | */ | |
435 | if (test_bit(LIOD_STOP, &pc->pc_flags)) { | |
436 | if (test_bit(LIOD_FORCE, &pc->pc_flags)) | |
437 | ptlrpc_abort_set(set); | |
438 | exit++; | |
439 | } | |
440 | ||
441 | /* | |
442 | * Let's make one more loop to make sure that ptlrpcd_check() | |
443 | * copied all raced new rpcs into the set so we can kill them. | |
444 | */ | |
445 | } while (exit < 2); | |
446 | ||
447 | /* | |
448 | * Wait for inflight requests to drain. | |
449 | */ | |
450 | if (!list_empty(&set->set_requests)) | |
451 | ptlrpc_set_wait(set); | |
452 | lu_context_fini(&env.le_ctx); | |
453 | ||
454 | complete(&pc->pc_finishing); | |
455 | ||
456 | return 0; | |
457 | } | |
458 | ||
459 | /* XXX: We want multiple CPU cores to share the async RPC load. So we start many | |
460 | * ptlrpcd threads. We also want to reduce the ptlrpcd overhead caused by | |
461 | * data transfer cross-CPU cores. So we bind ptlrpcd thread to specified | |
462 | * CPU core. But binding all ptlrpcd threads maybe cause response delay | |
463 | * because of some CPU core(s) busy with other loads. | |
464 | * | |
465 | * For example: "ls -l", some async RPCs for statahead are assigned to | |
466 | * ptlrpcd_0, and ptlrpcd_0 is bound to CPU_0, but CPU_0 may be quite busy | |
467 | * with other non-ptlrpcd, like "ls -l" itself (we want to the "ls -l" | |
468 | * thread, statahead thread, and ptlrpcd thread can run in parallel), under | |
469 | * such case, the statahead async RPCs can not be processed in time, it is | |
470 | * unexpected. If ptlrpcd_0 can be re-scheduled on other CPU core, it may | |
471 | * be better. But it breaks former data transfer policy. | |
472 | * | |
473 | * So we shouldn't be blind for avoiding the data transfer. We make some | |
474 | * compromise: divide the ptlrpcd threds pool into two parts. One part is | |
475 | * for bound mode, each ptlrpcd thread in this part is bound to some CPU | |
476 | * core. The other part is for free mode, all the ptlrpcd threads in the | |
477 | * part can be scheduled on any CPU core. We specify some partnership | |
478 | * between bound mode ptlrpcd thread(s) and free mode ptlrpcd thread(s), | |
479 | * and the async RPC load within the partners are shared. | |
480 | * | |
481 | * It can partly avoid data transfer cross-CPU (if the bound mode ptlrpcd | |
482 | * thread can be scheduled in time), and try to guarantee the async RPC | |
483 | * processed ASAP (as long as the free mode ptlrpcd thread can be scheduled | |
484 | * on any CPU core). | |
485 | * | |
486 | * As for how to specify the partnership between bound mode ptlrpcd | |
487 | * thread(s) and free mode ptlrpcd thread(s), the simplest way is to use | |
488 | * <free bound> pair. In future, we can specify some more complex | |
489 | * partnership based on the patches for CPU partition. But before such | |
490 | * patches are available, we prefer to use the simplest one. | |
491 | */ | |
492 | # ifdef CFS_CPU_MODE_NUMA | |
493 | # warning "fix ptlrpcd_bind() to use new CPU partition APIs" | |
494 | # endif | |
495 | static int ptlrpcd_bind(int index, int max) | |
496 | { | |
497 | struct ptlrpcd_ctl *pc; | |
498 | int rc = 0; | |
499 | #if defined(CONFIG_NUMA) | |
500 | cpumask_t mask; | |
501 | #endif | |
d7e09d03 PT |
502 | |
503 | LASSERT(index <= max - 1); | |
504 | pc = &ptlrpcds->pd_threads[index]; | |
505 | switch (ptlrpcd_bind_policy) { | |
506 | case PDB_POLICY_NONE: | |
507 | pc->pc_npartners = -1; | |
508 | break; | |
509 | case PDB_POLICY_FULL: | |
510 | pc->pc_npartners = 0; | |
511 | set_bit(LIOD_BIND, &pc->pc_flags); | |
512 | break; | |
513 | case PDB_POLICY_PAIR: | |
514 | LASSERT(max % 2 == 0); | |
515 | pc->pc_npartners = 1; | |
516 | break; | |
517 | case PDB_POLICY_NEIGHBOR: | |
518 | #if defined(CONFIG_NUMA) | |
519 | { | |
520 | int i; | |
521 | mask = *cpumask_of_node(cpu_to_node(index)); | |
522 | for (i = max; i < num_online_cpus(); i++) | |
523 | cpu_clear(i, mask); | |
524 | pc->pc_npartners = cpus_weight(mask) - 1; | |
525 | set_bit(LIOD_BIND, &pc->pc_flags); | |
526 | } | |
527 | #else | |
528 | LASSERT(max >= 3); | |
529 | pc->pc_npartners = 2; | |
530 | #endif | |
531 | break; | |
532 | default: | |
533 | CERROR("unknown ptlrpcd bind policy %d\n", ptlrpcd_bind_policy); | |
534 | rc = -EINVAL; | |
535 | } | |
536 | ||
537 | if (rc == 0 && pc->pc_npartners > 0) { | |
538 | OBD_ALLOC(pc->pc_partners, | |
539 | sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners); | |
540 | if (pc->pc_partners == NULL) { | |
541 | pc->pc_npartners = 0; | |
542 | rc = -ENOMEM; | |
543 | } else { | |
544 | switch (ptlrpcd_bind_policy) { | |
545 | case PDB_POLICY_PAIR: | |
546 | if (index & 0x1) { | |
547 | set_bit(LIOD_BIND, &pc->pc_flags); | |
548 | pc->pc_partners[0] = &ptlrpcds-> | |
549 | pd_threads[index - 1]; | |
550 | ptlrpcds->pd_threads[index - 1]. | |
551 | pc_partners[0] = pc; | |
552 | } | |
553 | break; | |
554 | case PDB_POLICY_NEIGHBOR: | |
555 | #if defined(CONFIG_NUMA) | |
556 | { | |
557 | struct ptlrpcd_ctl *ppc; | |
558 | int i, pidx; | |
559 | /* partners are cores in the same NUMA node. | |
560 | * setup partnership only with ptlrpcd threads | |
561 | * that are already initialized | |
562 | */ | |
563 | for (pidx = 0, i = 0; i < index; i++) { | |
564 | if (cpu_isset(i, mask)) { | |
565 | ppc = &ptlrpcds->pd_threads[i]; | |
566 | pc->pc_partners[pidx++] = ppc; | |
567 | ppc->pc_partners[ppc-> | |
568 | pc_npartners++] = pc; | |
569 | } | |
570 | } | |
571 | /* adjust number of partners to the number | |
572 | * of partnership really setup */ | |
573 | pc->pc_npartners = pidx; | |
574 | } | |
575 | #else | |
576 | if (index & 0x1) | |
577 | set_bit(LIOD_BIND, &pc->pc_flags); | |
578 | if (index > 0) { | |
579 | pc->pc_partners[0] = &ptlrpcds-> | |
580 | pd_threads[index - 1]; | |
581 | ptlrpcds->pd_threads[index - 1]. | |
582 | pc_partners[1] = pc; | |
583 | if (index == max - 1) { | |
584 | pc->pc_partners[1] = | |
585 | &ptlrpcds->pd_threads[0]; | |
586 | ptlrpcds->pd_threads[0]. | |
587 | pc_partners[0] = pc; | |
588 | } | |
589 | } | |
590 | #endif | |
591 | break; | |
592 | } | |
593 | } | |
594 | } | |
595 | ||
0a3bdb00 | 596 | return rc; |
d7e09d03 PT |
597 | } |
598 | ||
599 | ||
600 | int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc) | |
601 | { | |
602 | int rc; | |
603 | int env = 0; | |
d7e09d03 PT |
604 | |
605 | /* | |
606 | * Do not allow start second thread for one pc. | |
607 | */ | |
608 | if (test_and_set_bit(LIOD_START, &pc->pc_flags)) { | |
609 | CWARN("Starting second thread (%s) for same pc %p\n", | |
610 | name, pc); | |
0a3bdb00 | 611 | return 0; |
d7e09d03 PT |
612 | } |
613 | ||
614 | pc->pc_index = index; | |
615 | init_completion(&pc->pc_starting); | |
616 | init_completion(&pc->pc_finishing); | |
617 | spin_lock_init(&pc->pc_lock); | |
9edf0f67 | 618 | strlcpy(pc->pc_name, name, sizeof(pc->pc_name)); |
d7e09d03 PT |
619 | pc->pc_set = ptlrpc_prep_set(); |
620 | if (pc->pc_set == NULL) | |
621 | GOTO(out, rc = -ENOMEM); | |
622 | /* | |
623 | * So far only "client" ptlrpcd uses an environment. In the future, | |
624 | * ptlrpcd thread (or a thread-set) has to be given an argument, | |
625 | * describing its "scope". | |
626 | */ | |
627 | rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER); | |
628 | if (rc != 0) | |
629 | GOTO(out, rc); | |
630 | ||
631 | env = 1; | |
632 | { | |
68b636b6 GKH |
633 | struct task_struct *task; |
634 | ||
d7e09d03 PT |
635 | if (index >= 0) { |
636 | rc = ptlrpcd_bind(index, max); | |
637 | if (rc < 0) | |
638 | GOTO(out, rc); | |
639 | } | |
640 | ||
9edf0f67 | 641 | task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name); |
d7e09d03 PT |
642 | if (IS_ERR(task)) |
643 | GOTO(out, rc = PTR_ERR(task)); | |
644 | ||
645 | rc = 0; | |
646 | wait_for_completion(&pc->pc_starting); | |
647 | } | |
648 | out: | |
649 | if (rc) { | |
650 | if (pc->pc_set != NULL) { | |
651 | struct ptlrpc_request_set *set = pc->pc_set; | |
652 | ||
653 | spin_lock(&pc->pc_lock); | |
654 | pc->pc_set = NULL; | |
655 | spin_unlock(&pc->pc_lock); | |
656 | ptlrpc_set_destroy(set); | |
657 | } | |
658 | if (env != 0) | |
659 | lu_context_fini(&pc->pc_env.le_ctx); | |
660 | clear_bit(LIOD_BIND, &pc->pc_flags); | |
661 | clear_bit(LIOD_START, &pc->pc_flags); | |
662 | } | |
0a3bdb00 | 663 | return rc; |
d7e09d03 PT |
664 | } |
665 | ||
666 | void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force) | |
667 | { | |
d7e09d03 PT |
668 | if (!test_bit(LIOD_START, &pc->pc_flags)) { |
669 | CWARN("Thread for pc %p was not started\n", pc); | |
23f14e79 | 670 | return; |
d7e09d03 PT |
671 | } |
672 | ||
673 | set_bit(LIOD_STOP, &pc->pc_flags); | |
674 | if (force) | |
675 | set_bit(LIOD_FORCE, &pc->pc_flags); | |
676 | wake_up(&pc->pc_set->set_waitq); | |
d7e09d03 PT |
677 | } |
678 | ||
679 | void ptlrpcd_free(struct ptlrpcd_ctl *pc) | |
680 | { | |
681 | struct ptlrpc_request_set *set = pc->pc_set; | |
d7e09d03 PT |
682 | |
683 | if (!test_bit(LIOD_START, &pc->pc_flags)) { | |
684 | CWARN("Thread for pc %p was not started\n", pc); | |
685 | goto out; | |
686 | } | |
687 | ||
688 | wait_for_completion(&pc->pc_finishing); | |
689 | lu_context_fini(&pc->pc_env.le_ctx); | |
690 | ||
691 | spin_lock(&pc->pc_lock); | |
692 | pc->pc_set = NULL; | |
693 | spin_unlock(&pc->pc_lock); | |
694 | ptlrpc_set_destroy(set); | |
695 | ||
696 | clear_bit(LIOD_START, &pc->pc_flags); | |
697 | clear_bit(LIOD_STOP, &pc->pc_flags); | |
698 | clear_bit(LIOD_FORCE, &pc->pc_flags); | |
699 | clear_bit(LIOD_BIND, &pc->pc_flags); | |
700 | ||
701 | out: | |
702 | if (pc->pc_npartners > 0) { | |
703 | LASSERT(pc->pc_partners != NULL); | |
704 | ||
705 | OBD_FREE(pc->pc_partners, | |
706 | sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners); | |
707 | pc->pc_partners = NULL; | |
708 | } | |
709 | pc->pc_npartners = 0; | |
d7e09d03 PT |
710 | } |
711 | ||
712 | static void ptlrpcd_fini(void) | |
713 | { | |
714 | int i; | |
d7e09d03 PT |
715 | |
716 | if (ptlrpcds != NULL) { | |
717 | for (i = 0; i < ptlrpcds->pd_nthreads; i++) | |
718 | ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0); | |
719 | for (i = 0; i < ptlrpcds->pd_nthreads; i++) | |
720 | ptlrpcd_free(&ptlrpcds->pd_threads[i]); | |
721 | ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0); | |
722 | ptlrpcd_free(&ptlrpcds->pd_thread_rcv); | |
723 | OBD_FREE(ptlrpcds, ptlrpcds->pd_size); | |
724 | ptlrpcds = NULL; | |
725 | } | |
d7e09d03 PT |
726 | } |
727 | ||
728 | static int ptlrpcd_init(void) | |
729 | { | |
730 | int nthreads = num_online_cpus(); | |
731 | char name[16]; | |
732 | int size, i = -1, j, rc = 0; | |
d7e09d03 PT |
733 | |
734 | if (max_ptlrpcds > 0 && max_ptlrpcds < nthreads) | |
735 | nthreads = max_ptlrpcds; | |
736 | if (nthreads < 2) | |
737 | nthreads = 2; | |
738 | if (nthreads < 3 && ptlrpcd_bind_policy == PDB_POLICY_NEIGHBOR) | |
739 | ptlrpcd_bind_policy = PDB_POLICY_PAIR; | |
740 | else if (nthreads % 2 != 0 && ptlrpcd_bind_policy == PDB_POLICY_PAIR) | |
741 | nthreads &= ~1; /* make sure it is even */ | |
742 | ||
743 | size = offsetof(struct ptlrpcd, pd_threads[nthreads]); | |
744 | OBD_ALLOC(ptlrpcds, size); | |
745 | if (ptlrpcds == NULL) | |
746 | GOTO(out, rc = -ENOMEM); | |
747 | ||
9edf0f67 | 748 | snprintf(name, sizeof(name), "ptlrpcd_rcv"); |
d7e09d03 PT |
749 | set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags); |
750 | rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv); | |
751 | if (rc < 0) | |
752 | GOTO(out, rc); | |
753 | ||
754 | /* XXX: We start nthreads ptlrpc daemons. Each of them can process any | |
755 | * non-recovery async RPC to improve overall async RPC efficiency. | |
756 | * | |
757 | * But there are some issues with async I/O RPCs and async non-I/O | |
758 | * RPCs processed in the same set under some cases. The ptlrpcd may | |
759 | * be blocked by some async I/O RPC(s), then will cause other async | |
760 | * non-I/O RPC(s) can not be processed in time. | |
761 | * | |
762 | * Maybe we should distinguish blocked async RPCs from non-blocked | |
763 | * async RPCs, and process them in different ptlrpcd sets to avoid | |
764 | * unnecessary dependency. But how to distribute async RPCs load | |
765 | * among all the ptlrpc daemons becomes another trouble. */ | |
766 | for (i = 0; i < nthreads; i++) { | |
9edf0f67 | 767 | snprintf(name, sizeof(name), "ptlrpcd_%d", i); |
d7e09d03 PT |
768 | rc = ptlrpcd_start(i, nthreads, name, &ptlrpcds->pd_threads[i]); |
769 | if (rc < 0) | |
770 | GOTO(out, rc); | |
771 | } | |
772 | ||
773 | ptlrpcds->pd_size = size; | |
774 | ptlrpcds->pd_index = 0; | |
775 | ptlrpcds->pd_nthreads = nthreads; | |
776 | ||
777 | out: | |
778 | if (rc != 0 && ptlrpcds != NULL) { | |
779 | for (j = 0; j <= i; j++) | |
780 | ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0); | |
781 | for (j = 0; j <= i; j++) | |
782 | ptlrpcd_free(&ptlrpcds->pd_threads[j]); | |
783 | ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0); | |
784 | ptlrpcd_free(&ptlrpcds->pd_thread_rcv); | |
785 | OBD_FREE(ptlrpcds, size); | |
786 | ptlrpcds = NULL; | |
787 | } | |
788 | ||
0a3bdb00 | 789 | return 0; |
d7e09d03 PT |
790 | } |
791 | ||
792 | int ptlrpcd_addref(void) | |
793 | { | |
794 | int rc = 0; | |
d7e09d03 PT |
795 | |
796 | mutex_lock(&ptlrpcd_mutex); | |
797 | if (++ptlrpcd_users == 1) | |
798 | rc = ptlrpcd_init(); | |
799 | mutex_unlock(&ptlrpcd_mutex); | |
0a3bdb00 | 800 | return rc; |
d7e09d03 PT |
801 | } |
802 | EXPORT_SYMBOL(ptlrpcd_addref); | |
803 | ||
804 | void ptlrpcd_decref(void) | |
805 | { | |
806 | mutex_lock(&ptlrpcd_mutex); | |
807 | if (--ptlrpcd_users == 0) | |
808 | ptlrpcd_fini(); | |
809 | mutex_unlock(&ptlrpcd_mutex); | |
810 | } | |
811 | EXPORT_SYMBOL(ptlrpcd_decref); | |
812 | /** @} ptlrpcd */ |