4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/selftest/framework.c
38 * Author: Isaac Huang <isaac@clusterfs.com>
39 * Author: Liang Zhen <liangzhen@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LNET
46 lst_sid_t LST_INVALID_SID
= {LNET_NID_ANY
, -1};
48 static int session_timeout
= 100;
49 module_param(session_timeout
, int, 0444);
50 MODULE_PARM_DESC(session_timeout
, "test session timeout in seconds (100 by default, 0 == never)");
52 static int rpc_timeout
= 64;
53 module_param(rpc_timeout
, int, 0644);
54 MODULE_PARM_DESC(rpc_timeout
, "rpc timeout in seconds (64 by default, 0 == never)");
56 #define sfw_unpack_id(id) \
58 __swab64s(&(id).nid); \
59 __swab32s(&(id).pid); \
62 #define sfw_unpack_sid(sid) \
64 __swab64s(&(sid).ses_nid); \
65 __swab64s(&(sid).ses_stamp); \
68 #define sfw_unpack_fw_counters(fc) \
70 __swab32s(&(fc).running_ms); \
71 __swab32s(&(fc).active_batches); \
72 __swab32s(&(fc).zombie_sessions); \
73 __swab32s(&(fc).brw_errors); \
74 __swab32s(&(fc).ping_errors); \
77 #define sfw_unpack_rpc_counters(rc) \
79 __swab32s(&(rc).errors); \
80 __swab32s(&(rc).rpcs_sent); \
81 __swab32s(&(rc).rpcs_rcvd); \
82 __swab32s(&(rc).rpcs_dropped); \
83 __swab32s(&(rc).rpcs_expired); \
84 __swab64s(&(rc).bulk_get); \
85 __swab64s(&(rc).bulk_put); \
88 #define sfw_unpack_lnet_counters(lc) \
90 __swab32s(&(lc).errors); \
91 __swab32s(&(lc).msgs_max); \
92 __swab32s(&(lc).msgs_alloc); \
93 __swab32s(&(lc).send_count); \
94 __swab32s(&(lc).recv_count); \
95 __swab32s(&(lc).drop_count); \
96 __swab32s(&(lc).route_count); \
97 __swab64s(&(lc).send_length); \
98 __swab64s(&(lc).recv_length); \
99 __swab64s(&(lc).drop_length); \
100 __swab64s(&(lc).route_length); \
103 #define sfw_test_active(t) (atomic_read(&(t)->tsi_nactive) != 0)
104 #define sfw_batch_active(b) (atomic_read(&(b)->bat_nactive) != 0)
106 static struct smoketest_framework
{
107 struct list_head fw_zombie_rpcs
; /* RPCs to be recycled */
108 struct list_head fw_zombie_sessions
; /* stopping sessions */
109 struct list_head fw_tests
; /* registered test cases */
110 atomic_t fw_nzombies
; /* # zombie sessions */
111 spinlock_t fw_lock
; /* serialise */
112 sfw_session_t
*fw_session
; /* _the_ session */
113 int fw_shuttingdown
; /* shutdown in progress */
114 srpc_server_rpc_t
*fw_active_srpc
; /* running RPC */
118 int sfw_stop_batch(sfw_batch_t
*tsb
, int force
);
119 void sfw_destroy_session(sfw_session_t
*sn
);
121 static inline sfw_test_case_t
*
122 sfw_find_test_case(int id
)
124 sfw_test_case_t
*tsc
;
126 LASSERT(id
<= SRPC_SERVICE_MAX_ID
);
127 LASSERT(id
> SRPC_FRAMEWORK_SERVICE_MAX_ID
);
129 list_for_each_entry(tsc
, &sfw_data
.fw_tests
, tsc_list
) {
130 if (tsc
->tsc_srv_service
->sv_id
== id
)
138 sfw_register_test(srpc_service_t
*service
, sfw_test_client_ops_t
*cliops
)
140 sfw_test_case_t
*tsc
;
142 if (sfw_find_test_case(service
->sv_id
) != NULL
) {
143 CERROR("Failed to register test %s (%d)\n",
144 service
->sv_name
, service
->sv_id
);
148 LIBCFS_ALLOC(tsc
, sizeof(sfw_test_case_t
));
152 tsc
->tsc_cli_ops
= cliops
;
153 tsc
->tsc_srv_service
= service
;
155 list_add_tail(&tsc
->tsc_list
, &sfw_data
.fw_tests
);
160 sfw_add_session_timer(void)
162 sfw_session_t
*sn
= sfw_data
.fw_session
;
163 stt_timer_t
*timer
= &sn
->sn_timer
;
165 LASSERT(!sfw_data
.fw_shuttingdown
);
167 if (sn
== NULL
|| sn
->sn_timeout
== 0)
170 LASSERT(!sn
->sn_timer_active
);
172 sn
->sn_timer_active
= 1;
173 timer
->stt_expires
= ktime_get_real_seconds() + sn
->sn_timeout
;
174 stt_add_timer(timer
);
179 sfw_del_session_timer(void)
181 sfw_session_t
*sn
= sfw_data
.fw_session
;
183 if (sn
== NULL
|| !sn
->sn_timer_active
)
186 LASSERT(sn
->sn_timeout
!= 0);
188 if (stt_del_timer(&sn
->sn_timer
)) { /* timer defused */
189 sn
->sn_timer_active
= 0;
193 return EBUSY
; /* racing with sfw_session_expired() */
197 sfw_deactivate_session(void)
198 __must_hold(&sfw_data
.fw_lock
)
200 sfw_session_t
*sn
= sfw_data
.fw_session
;
203 sfw_test_case_t
*tsc
;
208 LASSERT(!sn
->sn_timer_active
);
210 sfw_data
.fw_session
= NULL
;
211 atomic_inc(&sfw_data
.fw_nzombies
);
212 list_add(&sn
->sn_list
, &sfw_data
.fw_zombie_sessions
);
214 spin_unlock(&sfw_data
.fw_lock
);
216 list_for_each_entry(tsc
, &sfw_data
.fw_tests
, tsc_list
) {
217 srpc_abort_service(tsc
->tsc_srv_service
);
220 spin_lock(&sfw_data
.fw_lock
);
222 list_for_each_entry(tsb
, &sn
->sn_batches
, bat_list
) {
223 if (sfw_batch_active(tsb
)) {
225 sfw_stop_batch(tsb
, 1);
230 return; /* wait for active batches to stop */
232 list_del_init(&sn
->sn_list
);
233 spin_unlock(&sfw_data
.fw_lock
);
235 sfw_destroy_session(sn
);
237 spin_lock(&sfw_data
.fw_lock
);
242 sfw_session_expired(void *data
)
244 sfw_session_t
*sn
= data
;
246 spin_lock(&sfw_data
.fw_lock
);
248 LASSERT(sn
->sn_timer_active
);
249 LASSERT(sn
== sfw_data
.fw_session
);
251 CWARN("Session expired! sid: %s-%llu, name: %s\n",
252 libcfs_nid2str(sn
->sn_id
.ses_nid
),
253 sn
->sn_id
.ses_stamp
, &sn
->sn_name
[0]);
255 sn
->sn_timer_active
= 0;
256 sfw_deactivate_session();
258 spin_unlock(&sfw_data
.fw_lock
);
262 sfw_init_session(sfw_session_t
*sn
, lst_sid_t sid
,
263 unsigned features
, const char *name
)
265 stt_timer_t
*timer
= &sn
->sn_timer
;
267 memset(sn
, 0, sizeof(sfw_session_t
));
268 INIT_LIST_HEAD(&sn
->sn_list
);
269 INIT_LIST_HEAD(&sn
->sn_batches
);
270 atomic_set(&sn
->sn_refcount
, 1); /* +1 for caller */
271 atomic_set(&sn
->sn_brw_errors
, 0);
272 atomic_set(&sn
->sn_ping_errors
, 0);
273 strlcpy(&sn
->sn_name
[0], name
, sizeof(sn
->sn_name
));
275 sn
->sn_timer_active
= 0;
277 sn
->sn_features
= features
;
278 sn
->sn_timeout
= session_timeout
;
279 sn
->sn_started
= cfs_time_current();
281 timer
->stt_data
= sn
;
282 timer
->stt_func
= sfw_session_expired
;
283 INIT_LIST_HEAD(&timer
->stt_list
);
286 /* completion handler for incoming framework RPCs */
288 sfw_server_rpc_done(struct srpc_server_rpc
*rpc
)
290 struct srpc_service
*sv
= rpc
->srpc_scd
->scd_svc
;
291 int status
= rpc
->srpc_status
;
294 "Incoming framework RPC done: service %s, peer %s, status %s:%d\n",
295 sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
),
296 swi_state2str(rpc
->srpc_wi
.swi_state
),
299 if (rpc
->srpc_bulk
!= NULL
)
305 sfw_client_rpc_fini(srpc_client_rpc_t
*rpc
)
307 LASSERT(rpc
->crpc_bulk
.bk_niov
== 0);
308 LASSERT(list_empty(&rpc
->crpc_list
));
309 LASSERT(atomic_read(&rpc
->crpc_refcount
) == 0);
312 "Outgoing framework RPC done: service %d, peer %s, status %s:%d:%d\n",
313 rpc
->crpc_service
, libcfs_id2str(rpc
->crpc_dest
),
314 swi_state2str(rpc
->crpc_wi
.swi_state
),
315 rpc
->crpc_aborted
, rpc
->crpc_status
);
317 spin_lock(&sfw_data
.fw_lock
);
319 /* my callers must finish all RPCs before shutting me down */
320 LASSERT(!sfw_data
.fw_shuttingdown
);
321 list_add(&rpc
->crpc_list
, &sfw_data
.fw_zombie_rpcs
);
323 spin_unlock(&sfw_data
.fw_lock
);
327 sfw_find_batch(lst_bid_t bid
)
329 sfw_session_t
*sn
= sfw_data
.fw_session
;
334 list_for_each_entry(bat
, &sn
->sn_batches
, bat_list
) {
335 if (bat
->bat_id
.bat_id
== bid
.bat_id
)
343 sfw_bid2batch(lst_bid_t bid
)
345 sfw_session_t
*sn
= sfw_data
.fw_session
;
350 bat
= sfw_find_batch(bid
);
354 LIBCFS_ALLOC(bat
, sizeof(sfw_batch_t
));
359 bat
->bat_session
= sn
;
361 atomic_set(&bat
->bat_nactive
, 0);
362 INIT_LIST_HEAD(&bat
->bat_tests
);
364 list_add_tail(&bat
->bat_list
, &sn
->sn_batches
);
369 sfw_get_stats(srpc_stat_reqst_t
*request
, srpc_stat_reply_t
*reply
)
371 sfw_session_t
*sn
= sfw_data
.fw_session
;
372 sfw_counters_t
*cnt
= &reply
->str_fw
;
375 reply
->str_sid
= (sn
== NULL
) ? LST_INVALID_SID
: sn
->sn_id
;
377 if (request
->str_sid
.ses_nid
== LNET_NID_ANY
) {
378 reply
->str_status
= EINVAL
;
382 if (sn
== NULL
|| !sfw_sid_equal(request
->str_sid
, sn
->sn_id
)) {
383 reply
->str_status
= ESRCH
;
387 lnet_counters_get(&reply
->str_lnet
);
388 srpc_get_counters(&reply
->str_rpc
);
390 /* send over the msecs since the session was started
391 - with 32 bits to send, this is ~49 days */
392 cnt
->running_ms
= jiffies_to_msecs(jiffies
- sn
->sn_started
);
393 cnt
->brw_errors
= atomic_read(&sn
->sn_brw_errors
);
394 cnt
->ping_errors
= atomic_read(&sn
->sn_ping_errors
);
395 cnt
->zombie_sessions
= atomic_read(&sfw_data
.fw_nzombies
);
397 cnt
->active_batches
= 0;
398 list_for_each_entry(bat
, &sn
->sn_batches
, bat_list
) {
399 if (atomic_read(&bat
->bat_nactive
) > 0)
400 cnt
->active_batches
++;
403 reply
->str_status
= 0;
408 sfw_make_session(srpc_mksn_reqst_t
*request
, srpc_mksn_reply_t
*reply
)
410 sfw_session_t
*sn
= sfw_data
.fw_session
;
411 srpc_msg_t
*msg
= container_of(request
, srpc_msg_t
,
412 msg_body
.mksn_reqst
);
415 if (request
->mksn_sid
.ses_nid
== LNET_NID_ANY
) {
416 reply
->mksn_sid
= (sn
== NULL
) ? LST_INVALID_SID
: sn
->sn_id
;
417 reply
->mksn_status
= EINVAL
;
422 reply
->mksn_status
= 0;
423 reply
->mksn_sid
= sn
->sn_id
;
424 reply
->mksn_timeout
= sn
->sn_timeout
;
426 if (sfw_sid_equal(request
->mksn_sid
, sn
->sn_id
)) {
427 atomic_inc(&sn
->sn_refcount
);
431 if (!request
->mksn_force
) {
432 reply
->mksn_status
= EBUSY
;
433 cplen
= strlcpy(&reply
->mksn_name
[0], &sn
->sn_name
[0],
434 sizeof(reply
->mksn_name
));
435 if (cplen
>= sizeof(reply
->mksn_name
))
441 /* reject the request if it requires unknown features
442 * NB: old version will always accept all features because it's not
443 * aware of srpc_msg_t::msg_ses_feats, it's a defect but it's also
444 * harmless because it will return zero feature to console, and it's
445 * console's responsibility to make sure all nodes in a session have
446 * same feature mask. */
447 if ((msg
->msg_ses_feats
& ~LST_FEATS_MASK
) != 0) {
448 reply
->mksn_status
= EPROTO
;
452 /* brand new or create by force */
453 LIBCFS_ALLOC(sn
, sizeof(sfw_session_t
));
455 CERROR("Dropping RPC (mksn) under memory pressure.\n");
459 sfw_init_session(sn
, request
->mksn_sid
,
460 msg
->msg_ses_feats
, &request
->mksn_name
[0]);
462 spin_lock(&sfw_data
.fw_lock
);
464 sfw_deactivate_session();
465 LASSERT(sfw_data
.fw_session
== NULL
);
466 sfw_data
.fw_session
= sn
;
468 spin_unlock(&sfw_data
.fw_lock
);
470 reply
->mksn_status
= 0;
471 reply
->mksn_sid
= sn
->sn_id
;
472 reply
->mksn_timeout
= sn
->sn_timeout
;
477 sfw_remove_session(srpc_rmsn_reqst_t
*request
, srpc_rmsn_reply_t
*reply
)
479 sfw_session_t
*sn
= sfw_data
.fw_session
;
481 reply
->rmsn_sid
= (sn
== NULL
) ? LST_INVALID_SID
: sn
->sn_id
;
483 if (request
->rmsn_sid
.ses_nid
== LNET_NID_ANY
) {
484 reply
->rmsn_status
= EINVAL
;
488 if (sn
== NULL
|| !sfw_sid_equal(request
->rmsn_sid
, sn
->sn_id
)) {
489 reply
->rmsn_status
= (sn
== NULL
) ? ESRCH
: EBUSY
;
493 if (!atomic_dec_and_test(&sn
->sn_refcount
)) {
494 reply
->rmsn_status
= 0;
498 spin_lock(&sfw_data
.fw_lock
);
499 sfw_deactivate_session();
500 spin_unlock(&sfw_data
.fw_lock
);
502 reply
->rmsn_status
= 0;
503 reply
->rmsn_sid
= LST_INVALID_SID
;
504 LASSERT(sfw_data
.fw_session
== NULL
);
509 sfw_debug_session(srpc_debug_reqst_t
*request
, srpc_debug_reply_t
*reply
)
511 sfw_session_t
*sn
= sfw_data
.fw_session
;
514 reply
->dbg_status
= ESRCH
;
515 reply
->dbg_sid
= LST_INVALID_SID
;
519 reply
->dbg_status
= 0;
520 reply
->dbg_sid
= sn
->sn_id
;
521 reply
->dbg_timeout
= sn
->sn_timeout
;
522 if (strlcpy(reply
->dbg_name
, &sn
->sn_name
[0], sizeof(reply
->dbg_name
))
523 >= sizeof(reply
->dbg_name
))
530 sfw_test_rpc_fini(srpc_client_rpc_t
*rpc
)
532 sfw_test_unit_t
*tsu
= rpc
->crpc_priv
;
533 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
535 /* Called with hold of tsi->tsi_lock */
536 LASSERT(list_empty(&rpc
->crpc_list
));
537 list_add(&rpc
->crpc_list
, &tsi
->tsi_free_rpcs
);
541 sfw_test_buffers(sfw_test_instance_t
*tsi
)
543 struct sfw_test_case
*tsc
= sfw_find_test_case(tsi
->tsi_service
);
544 struct srpc_service
*svc
= tsc
->tsc_srv_service
;
547 nbuf
= min(svc
->sv_wi_total
, tsi
->tsi_loop
) / svc
->sv_ncpts
;
548 return max(SFW_TEST_WI_MIN
, nbuf
+ SFW_TEST_WI_EXTRA
);
552 sfw_load_test(struct sfw_test_instance
*tsi
)
554 struct sfw_test_case
*tsc
;
555 struct srpc_service
*svc
;
559 LASSERT(tsi
!= NULL
);
560 tsc
= sfw_find_test_case(tsi
->tsi_service
);
561 nbuf
= sfw_test_buffers(tsi
);
562 LASSERT(tsc
!= NULL
);
563 svc
= tsc
->tsc_srv_service
;
565 if (tsi
->tsi_is_client
) {
566 tsi
->tsi_ops
= tsc
->tsc_cli_ops
;
570 rc
= srpc_service_add_buffers(svc
, nbuf
);
572 CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
573 svc
->sv_name
, nbuf
, rc
);
574 /* NB: this error handler is not strictly correct, because
575 * it may release more buffers than already allocated,
576 * but it doesn't matter because request portal should
577 * be lazy portal and will grow buffers if necessary. */
578 srpc_service_remove_buffers(svc
, nbuf
);
582 CDEBUG(D_NET
, "Reserved %d buffers for test %s\n",
583 nbuf
* (srpc_serv_is_framework(svc
) ?
584 1 : cfs_cpt_number(cfs_cpt_table
)), svc
->sv_name
);
589 sfw_unload_test(struct sfw_test_instance
*tsi
)
591 struct sfw_test_case
*tsc
= sfw_find_test_case(tsi
->tsi_service
);
593 LASSERT(tsc
!= NULL
);
595 if (tsi
->tsi_is_client
)
598 /* shrink buffers, because request portal is lazy portal
599 * which can grow buffers at runtime so we may leave
600 * some buffers behind, but never mind... */
601 srpc_service_remove_buffers(tsc
->tsc_srv_service
,
602 sfw_test_buffers(tsi
));
607 sfw_destroy_test_instance(sfw_test_instance_t
*tsi
)
609 srpc_client_rpc_t
*rpc
;
610 sfw_test_unit_t
*tsu
;
612 if (!tsi
->tsi_is_client
)
615 tsi
->tsi_ops
->tso_fini(tsi
);
617 LASSERT(!tsi
->tsi_stopping
);
618 LASSERT(list_empty(&tsi
->tsi_active_rpcs
));
619 LASSERT(!sfw_test_active(tsi
));
621 while (!list_empty(&tsi
->tsi_units
)) {
622 tsu
= list_entry(tsi
->tsi_units
.next
,
623 sfw_test_unit_t
, tsu_list
);
624 list_del(&tsu
->tsu_list
);
625 LIBCFS_FREE(tsu
, sizeof(*tsu
));
628 while (!list_empty(&tsi
->tsi_free_rpcs
)) {
629 rpc
= list_entry(tsi
->tsi_free_rpcs
.next
,
630 srpc_client_rpc_t
, crpc_list
);
631 list_del(&rpc
->crpc_list
);
632 LIBCFS_FREE(rpc
, srpc_client_rpc_size(rpc
));
636 sfw_unload_test(tsi
);
637 LIBCFS_FREE(tsi
, sizeof(*tsi
));
642 sfw_destroy_batch(sfw_batch_t
*tsb
)
644 sfw_test_instance_t
*tsi
;
646 LASSERT(!sfw_batch_active(tsb
));
647 LASSERT(list_empty(&tsb
->bat_list
));
649 while (!list_empty(&tsb
->bat_tests
)) {
650 tsi
= list_entry(tsb
->bat_tests
.next
,
651 sfw_test_instance_t
, tsi_list
);
652 list_del_init(&tsi
->tsi_list
);
653 sfw_destroy_test_instance(tsi
);
656 LIBCFS_FREE(tsb
, sizeof(sfw_batch_t
));
661 sfw_destroy_session(sfw_session_t
*sn
)
665 LASSERT(list_empty(&sn
->sn_list
));
666 LASSERT(sn
!= sfw_data
.fw_session
);
668 while (!list_empty(&sn
->sn_batches
)) {
669 batch
= list_entry(sn
->sn_batches
.next
,
670 sfw_batch_t
, bat_list
);
671 list_del_init(&batch
->bat_list
);
672 sfw_destroy_batch(batch
);
675 LIBCFS_FREE(sn
, sizeof(*sn
));
676 atomic_dec(&sfw_data
.fw_nzombies
);
681 sfw_unpack_addtest_req(srpc_msg_t
*msg
)
683 srpc_test_reqst_t
*req
= &msg
->msg_body
.tes_reqst
;
685 LASSERT(msg
->msg_type
== SRPC_MSG_TEST_REQST
);
686 LASSERT(req
->tsr_is_client
);
688 if (msg
->msg_magic
== SRPC_MSG_MAGIC
)
689 return; /* no flipping needed */
691 LASSERT(msg
->msg_magic
== __swab32(SRPC_MSG_MAGIC
));
693 if (req
->tsr_service
== SRPC_SERVICE_BRW
) {
694 if ((msg
->msg_ses_feats
& LST_FEAT_BULK_LEN
) == 0) {
695 test_bulk_req_t
*bulk
= &req
->tsr_u
.bulk_v0
;
697 __swab32s(&bulk
->blk_opc
);
698 __swab32s(&bulk
->blk_npg
);
699 __swab32s(&bulk
->blk_flags
);
702 test_bulk_req_v1_t
*bulk
= &req
->tsr_u
.bulk_v1
;
704 __swab16s(&bulk
->blk_opc
);
705 __swab16s(&bulk
->blk_flags
);
706 __swab32s(&bulk
->blk_offset
);
707 __swab32s(&bulk
->blk_len
);
713 if (req
->tsr_service
== SRPC_SERVICE_PING
) {
714 test_ping_req_t
*ping
= &req
->tsr_u
.ping
;
716 __swab32s(&ping
->png_size
);
717 __swab32s(&ping
->png_flags
);
726 sfw_add_test_instance(sfw_batch_t
*tsb
, srpc_server_rpc_t
*rpc
)
728 srpc_msg_t
*msg
= &rpc
->srpc_reqstbuf
->buf_msg
;
729 srpc_test_reqst_t
*req
= &msg
->msg_body
.tes_reqst
;
730 srpc_bulk_t
*bk
= rpc
->srpc_bulk
;
731 int ndest
= req
->tsr_ndest
;
732 sfw_test_unit_t
*tsu
;
733 sfw_test_instance_t
*tsi
;
737 LIBCFS_ALLOC(tsi
, sizeof(*tsi
));
739 CERROR("Can't allocate test instance for batch: %llu\n",
744 spin_lock_init(&tsi
->tsi_lock
);
745 atomic_set(&tsi
->tsi_nactive
, 0);
746 INIT_LIST_HEAD(&tsi
->tsi_units
);
747 INIT_LIST_HEAD(&tsi
->tsi_free_rpcs
);
748 INIT_LIST_HEAD(&tsi
->tsi_active_rpcs
);
750 tsi
->tsi_stopping
= 0;
751 tsi
->tsi_batch
= tsb
;
752 tsi
->tsi_loop
= req
->tsr_loop
;
753 tsi
->tsi_concur
= req
->tsr_concur
;
754 tsi
->tsi_service
= req
->tsr_service
;
755 tsi
->tsi_is_client
= !!(req
->tsr_is_client
);
756 tsi
->tsi_stoptsu_onerr
= !!(req
->tsr_stop_onerr
);
758 rc
= sfw_load_test(tsi
);
760 LIBCFS_FREE(tsi
, sizeof(*tsi
));
764 LASSERT(!sfw_batch_active(tsb
));
766 if (!tsi
->tsi_is_client
) {
767 /* it's test server, just add it to tsb */
768 list_add_tail(&tsi
->tsi_list
, &tsb
->bat_tests
);
773 LASSERT(bk
->bk_niov
* SFW_ID_PER_PAGE
>= (unsigned int)ndest
);
774 LASSERT((unsigned int)bk
->bk_len
>=
775 sizeof(lnet_process_id_packed_t
) * ndest
);
777 sfw_unpack_addtest_req(msg
);
778 memcpy(&tsi
->tsi_u
, &req
->tsr_u
, sizeof(tsi
->tsi_u
));
780 for (i
= 0; i
< ndest
; i
++) {
781 lnet_process_id_packed_t
*dests
;
782 lnet_process_id_packed_t id
;
785 dests
= page_address(bk
->bk_iovs
[i
/ SFW_ID_PER_PAGE
].kiov_page
);
786 LASSERT(dests
!= NULL
); /* my pages are within KVM always */
787 id
= dests
[i
% SFW_ID_PER_PAGE
];
788 if (msg
->msg_magic
!= SRPC_MSG_MAGIC
)
791 for (j
= 0; j
< tsi
->tsi_concur
; j
++) {
792 LIBCFS_ALLOC(tsu
, sizeof(sfw_test_unit_t
));
795 CERROR("Can't allocate tsu for %d\n",
800 tsu
->tsu_dest
.nid
= id
.nid
;
801 tsu
->tsu_dest
.pid
= id
.pid
;
802 tsu
->tsu_instance
= tsi
;
803 tsu
->tsu_private
= NULL
;
804 list_add_tail(&tsu
->tsu_list
, &tsi
->tsi_units
);
808 rc
= tsi
->tsi_ops
->tso_init(tsi
);
810 list_add_tail(&tsi
->tsi_list
, &tsb
->bat_tests
);
816 sfw_destroy_test_instance(tsi
);
821 sfw_test_unit_done(sfw_test_unit_t
*tsu
)
823 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
824 sfw_batch_t
*tsb
= tsi
->tsi_batch
;
825 sfw_session_t
*sn
= tsb
->bat_session
;
827 LASSERT(sfw_test_active(tsi
));
829 if (!atomic_dec_and_test(&tsi
->tsi_nactive
))
832 /* the test instance is done */
833 spin_lock(&tsi
->tsi_lock
);
835 tsi
->tsi_stopping
= 0;
837 spin_unlock(&tsi
->tsi_lock
);
839 spin_lock(&sfw_data
.fw_lock
);
841 if (!atomic_dec_and_test(&tsb
->bat_nactive
) ||/* tsb still active */
842 sn
== sfw_data
.fw_session
) { /* sn also active */
843 spin_unlock(&sfw_data
.fw_lock
);
847 LASSERT(!list_empty(&sn
->sn_list
)); /* I'm a zombie! */
849 list_for_each_entry(tsb
, &sn
->sn_batches
, bat_list
) {
850 if (sfw_batch_active(tsb
)) {
851 spin_unlock(&sfw_data
.fw_lock
);
856 list_del_init(&sn
->sn_list
);
857 spin_unlock(&sfw_data
.fw_lock
);
859 sfw_destroy_session(sn
);
864 sfw_test_rpc_done(srpc_client_rpc_t
*rpc
)
866 sfw_test_unit_t
*tsu
= rpc
->crpc_priv
;
867 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
870 tsi
->tsi_ops
->tso_done_rpc(tsu
, rpc
);
872 spin_lock(&tsi
->tsi_lock
);
874 LASSERT(sfw_test_active(tsi
));
875 LASSERT(!list_empty(&rpc
->crpc_list
));
877 list_del_init(&rpc
->crpc_list
);
879 /* batch is stopping or loop is done or get error */
880 if (tsi
->tsi_stopping
||
881 tsu
->tsu_loop
== 0 ||
882 (rpc
->crpc_status
!= 0 && tsi
->tsi_stoptsu_onerr
))
885 /* dec ref for poster */
886 srpc_client_rpc_decref(rpc
);
888 spin_unlock(&tsi
->tsi_lock
);
891 swi_schedule_workitem(&tsu
->tsu_worker
);
895 sfw_test_unit_done(tsu
);
900 sfw_create_test_rpc(sfw_test_unit_t
*tsu
, lnet_process_id_t peer
,
901 unsigned features
, int nblk
, int blklen
,
902 srpc_client_rpc_t
**rpcpp
)
904 srpc_client_rpc_t
*rpc
= NULL
;
905 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
907 spin_lock(&tsi
->tsi_lock
);
909 LASSERT(sfw_test_active(tsi
));
911 if (!list_empty(&tsi
->tsi_free_rpcs
)) {
912 /* pick request from buffer */
913 rpc
= list_entry(tsi
->tsi_free_rpcs
.next
,
914 srpc_client_rpc_t
, crpc_list
);
915 LASSERT(nblk
== rpc
->crpc_bulk
.bk_niov
);
916 list_del_init(&rpc
->crpc_list
);
919 spin_unlock(&tsi
->tsi_lock
);
922 rpc
= srpc_create_client_rpc(peer
, tsi
->tsi_service
, nblk
,
923 blklen
, sfw_test_rpc_done
,
924 sfw_test_rpc_fini
, tsu
);
926 srpc_init_client_rpc(rpc
, peer
, tsi
->tsi_service
, nblk
,
927 blklen
, sfw_test_rpc_done
,
928 sfw_test_rpc_fini
, tsu
);
932 CERROR("Can't create rpc for test %d\n", tsi
->tsi_service
);
936 rpc
->crpc_reqstmsg
.msg_ses_feats
= features
;
943 sfw_run_test(swi_workitem_t
*wi
)
945 sfw_test_unit_t
*tsu
= wi
->swi_workitem
.wi_data
;
946 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
947 srpc_client_rpc_t
*rpc
= NULL
;
949 LASSERT(wi
== &tsu
->tsu_worker
);
951 if (tsi
->tsi_ops
->tso_prep_rpc(tsu
, tsu
->tsu_dest
, &rpc
) != 0) {
952 LASSERT(rpc
== NULL
);
956 LASSERT(rpc
!= NULL
);
958 spin_lock(&tsi
->tsi_lock
);
960 if (tsi
->tsi_stopping
) {
961 list_add(&rpc
->crpc_list
, &tsi
->tsi_free_rpcs
);
962 spin_unlock(&tsi
->tsi_lock
);
966 if (tsu
->tsu_loop
> 0)
969 list_add_tail(&rpc
->crpc_list
, &tsi
->tsi_active_rpcs
);
970 spin_unlock(&tsi
->tsi_lock
);
972 rpc
->crpc_timeout
= rpc_timeout
;
974 spin_lock(&rpc
->crpc_lock
);
976 spin_unlock(&rpc
->crpc_lock
);
981 * No one can schedule me now since:
982 * - previous RPC, if any, has done and
983 * - no new RPC is initiated.
984 * - my batch is still active; no one can run it again now.
985 * Cancel pending schedules and prevent future schedule attempts:
987 swi_exit_workitem(wi
);
988 sfw_test_unit_done(tsu
);
993 sfw_run_batch(sfw_batch_t
*tsb
)
996 sfw_test_unit_t
*tsu
;
997 sfw_test_instance_t
*tsi
;
999 if (sfw_batch_active(tsb
)) {
1000 CDEBUG(D_NET
, "Batch already active: %llu (%d)\n",
1001 tsb
->bat_id
.bat_id
, atomic_read(&tsb
->bat_nactive
));
1005 list_for_each_entry(tsi
, &tsb
->bat_tests
, tsi_list
) {
1006 if (!tsi
->tsi_is_client
) /* skip server instances */
1009 LASSERT(!tsi
->tsi_stopping
);
1010 LASSERT(!sfw_test_active(tsi
));
1012 atomic_inc(&tsb
->bat_nactive
);
1014 list_for_each_entry(tsu
, &tsi
->tsi_units
, tsu_list
) {
1015 atomic_inc(&tsi
->tsi_nactive
);
1016 tsu
->tsu_loop
= tsi
->tsi_loop
;
1017 wi
= &tsu
->tsu_worker
;
1018 swi_init_workitem(wi
, tsu
, sfw_run_test
,
1020 lnet_cpt_of_nid(tsu
->tsu_dest
.nid
)]);
1021 swi_schedule_workitem(wi
);
1029 sfw_stop_batch(sfw_batch_t
*tsb
, int force
)
1031 sfw_test_instance_t
*tsi
;
1032 srpc_client_rpc_t
*rpc
;
1034 if (!sfw_batch_active(tsb
)) {
1035 CDEBUG(D_NET
, "Batch %llu inactive\n", tsb
->bat_id
.bat_id
);
1039 list_for_each_entry(tsi
, &tsb
->bat_tests
, tsi_list
) {
1040 spin_lock(&tsi
->tsi_lock
);
1042 if (!tsi
->tsi_is_client
||
1043 !sfw_test_active(tsi
) || tsi
->tsi_stopping
) {
1044 spin_unlock(&tsi
->tsi_lock
);
1048 tsi
->tsi_stopping
= 1;
1051 spin_unlock(&tsi
->tsi_lock
);
1055 /* abort launched rpcs in the test */
1056 list_for_each_entry(rpc
, &tsi
->tsi_active_rpcs
, crpc_list
) {
1057 spin_lock(&rpc
->crpc_lock
);
1059 srpc_abort_rpc(rpc
, -EINTR
);
1061 spin_unlock(&rpc
->crpc_lock
);
1064 spin_unlock(&tsi
->tsi_lock
);
1071 sfw_query_batch(sfw_batch_t
*tsb
, int testidx
, srpc_batch_reply_t
*reply
)
1073 sfw_test_instance_t
*tsi
;
1079 reply
->bar_active
= atomic_read(&tsb
->bat_nactive
);
1083 list_for_each_entry(tsi
, &tsb
->bat_tests
, tsi_list
) {
1087 reply
->bar_active
= atomic_read(&tsi
->tsi_nactive
);
1095 sfw_free_pages(srpc_server_rpc_t
*rpc
)
1097 srpc_free_bulk(rpc
->srpc_bulk
);
1098 rpc
->srpc_bulk
= NULL
;
1102 sfw_alloc_pages(struct srpc_server_rpc
*rpc
, int cpt
, int npages
, int len
,
1105 LASSERT(rpc
->srpc_bulk
== NULL
);
1106 LASSERT(npages
> 0 && npages
<= LNET_MAX_IOV
);
1108 rpc
->srpc_bulk
= srpc_alloc_bulk(cpt
, npages
, len
, sink
);
1109 if (rpc
->srpc_bulk
== NULL
)
1116 sfw_add_test(srpc_server_rpc_t
*rpc
)
1118 sfw_session_t
*sn
= sfw_data
.fw_session
;
1119 srpc_test_reply_t
*reply
= &rpc
->srpc_replymsg
.msg_body
.tes_reply
;
1120 srpc_test_reqst_t
*request
;
1124 request
= &rpc
->srpc_reqstbuf
->buf_msg
.msg_body
.tes_reqst
;
1125 reply
->tsr_sid
= (sn
== NULL
) ? LST_INVALID_SID
: sn
->sn_id
;
1127 if (request
->tsr_loop
== 0 ||
1128 request
->tsr_concur
== 0 ||
1129 request
->tsr_sid
.ses_nid
== LNET_NID_ANY
||
1130 request
->tsr_ndest
> SFW_MAX_NDESTS
||
1131 (request
->tsr_is_client
&& request
->tsr_ndest
== 0) ||
1132 request
->tsr_concur
> SFW_MAX_CONCUR
||
1133 request
->tsr_service
> SRPC_SERVICE_MAX_ID
||
1134 request
->tsr_service
<= SRPC_FRAMEWORK_SERVICE_MAX_ID
) {
1135 reply
->tsr_status
= EINVAL
;
1139 if (sn
== NULL
|| !sfw_sid_equal(request
->tsr_sid
, sn
->sn_id
) ||
1140 sfw_find_test_case(request
->tsr_service
) == NULL
) {
1141 reply
->tsr_status
= ENOENT
;
1145 bat
= sfw_bid2batch(request
->tsr_bid
);
1147 CERROR("Dropping RPC (%s) from %s under memory pressure.\n",
1148 rpc
->srpc_scd
->scd_svc
->sv_name
,
1149 libcfs_id2str(rpc
->srpc_peer
));
1153 if (sfw_batch_active(bat
)) {
1154 reply
->tsr_status
= EBUSY
;
1158 if (request
->tsr_is_client
&& rpc
->srpc_bulk
== NULL
) {
1159 /* rpc will be resumed later in sfw_bulk_ready */
1160 int npg
= sfw_id_pages(request
->tsr_ndest
);
1163 if ((sn
->sn_features
& LST_FEAT_BULK_LEN
) == 0) {
1164 len
= npg
* PAGE_CACHE_SIZE
;
1167 len
= sizeof(lnet_process_id_packed_t
) *
1171 return sfw_alloc_pages(rpc
, CFS_CPT_ANY
, npg
, len
, 1);
1174 rc
= sfw_add_test_instance(bat
, rpc
);
1175 CDEBUG(rc
== 0 ? D_NET
: D_WARNING
,
1176 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1177 rc
== 0 ? "Added" : "Failed to add", request
->tsr_service
,
1178 request
->tsr_is_client
? "client" : "server",
1179 request
->tsr_loop
, request
->tsr_concur
, request
->tsr_ndest
);
1181 reply
->tsr_status
= (rc
< 0) ? -rc
: rc
;
1186 sfw_control_batch(srpc_batch_reqst_t
*request
, srpc_batch_reply_t
*reply
)
1188 sfw_session_t
*sn
= sfw_data
.fw_session
;
1192 reply
->bar_sid
= (sn
== NULL
) ? LST_INVALID_SID
: sn
->sn_id
;
1194 if (sn
== NULL
|| !sfw_sid_equal(request
->bar_sid
, sn
->sn_id
)) {
1195 reply
->bar_status
= ESRCH
;
1199 bat
= sfw_find_batch(request
->bar_bid
);
1201 reply
->bar_status
= ENOENT
;
1205 switch (request
->bar_opc
) {
1206 case SRPC_BATCH_OPC_RUN
:
1207 rc
= sfw_run_batch(bat
);
1210 case SRPC_BATCH_OPC_STOP
:
1211 rc
= sfw_stop_batch(bat
, request
->bar_arg
);
1214 case SRPC_BATCH_OPC_QUERY
:
1215 rc
= sfw_query_batch(bat
, request
->bar_testidx
, reply
);
1219 return -EINVAL
; /* drop it */
1222 reply
->bar_status
= (rc
< 0) ? -rc
: rc
;
1227 sfw_handle_server_rpc(struct srpc_server_rpc
*rpc
)
1229 struct srpc_service
*sv
= rpc
->srpc_scd
->scd_svc
;
1230 srpc_msg_t
*reply
= &rpc
->srpc_replymsg
;
1231 srpc_msg_t
*request
= &rpc
->srpc_reqstbuf
->buf_msg
;
1232 unsigned features
= LST_FEATS_MASK
;
1235 LASSERT(sfw_data
.fw_active_srpc
== NULL
);
1236 LASSERT(sv
->sv_id
<= SRPC_FRAMEWORK_SERVICE_MAX_ID
);
1238 spin_lock(&sfw_data
.fw_lock
);
1240 if (sfw_data
.fw_shuttingdown
) {
1241 spin_unlock(&sfw_data
.fw_lock
);
1245 /* Remove timer to avoid racing with it or expiring active session */
1246 if (sfw_del_session_timer() != 0) {
1247 CERROR("Dropping RPC (%s) from %s: racing with expiry timer.",
1248 sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
));
1249 spin_unlock(&sfw_data
.fw_lock
);
1253 sfw_data
.fw_active_srpc
= rpc
;
1254 spin_unlock(&sfw_data
.fw_lock
);
1256 sfw_unpack_message(request
);
1257 LASSERT(request
->msg_type
== srpc_service2request(sv
->sv_id
));
1259 /* rpc module should have checked this */
1260 LASSERT(request
->msg_version
== SRPC_MSG_VERSION
);
1262 if (sv
->sv_id
!= SRPC_SERVICE_MAKE_SESSION
&&
1263 sv
->sv_id
!= SRPC_SERVICE_DEBUG
) {
1264 sfw_session_t
*sn
= sfw_data
.fw_session
;
1267 sn
->sn_features
!= request
->msg_ses_feats
) {
1268 CNETERR("Features of framework RPC don't match features of current session: %x/%x\n",
1269 request
->msg_ses_feats
, sn
->sn_features
);
1270 reply
->msg_body
.reply
.status
= EPROTO
;
1271 reply
->msg_body
.reply
.sid
= sn
->sn_id
;
1275 } else if ((request
->msg_ses_feats
& ~LST_FEATS_MASK
) != 0) {
1276 /* NB: at this point, old version will ignore features and
1277 * create new session anyway, so console should be able
1279 reply
->msg_body
.reply
.status
= EPROTO
;
1283 switch (sv
->sv_id
) {
1286 case SRPC_SERVICE_TEST
:
1287 rc
= sfw_add_test(rpc
);
1290 case SRPC_SERVICE_BATCH
:
1291 rc
= sfw_control_batch(&request
->msg_body
.bat_reqst
,
1292 &reply
->msg_body
.bat_reply
);
1295 case SRPC_SERVICE_QUERY_STAT
:
1296 rc
= sfw_get_stats(&request
->msg_body
.stat_reqst
,
1297 &reply
->msg_body
.stat_reply
);
1300 case SRPC_SERVICE_DEBUG
:
1301 rc
= sfw_debug_session(&request
->msg_body
.dbg_reqst
,
1302 &reply
->msg_body
.dbg_reply
);
1305 case SRPC_SERVICE_MAKE_SESSION
:
1306 rc
= sfw_make_session(&request
->msg_body
.mksn_reqst
,
1307 &reply
->msg_body
.mksn_reply
);
1310 case SRPC_SERVICE_REMOVE_SESSION
:
1311 rc
= sfw_remove_session(&request
->msg_body
.rmsn_reqst
,
1312 &reply
->msg_body
.rmsn_reply
);
1316 if (sfw_data
.fw_session
!= NULL
)
1317 features
= sfw_data
.fw_session
->sn_features
;
1319 reply
->msg_ses_feats
= features
;
1320 rpc
->srpc_done
= sfw_server_rpc_done
;
1321 spin_lock(&sfw_data
.fw_lock
);
1323 if (!sfw_data
.fw_shuttingdown
)
1324 sfw_add_session_timer();
1326 sfw_data
.fw_active_srpc
= NULL
;
1327 spin_unlock(&sfw_data
.fw_lock
);
1332 sfw_bulk_ready(struct srpc_server_rpc
*rpc
, int status
)
1334 struct srpc_service
*sv
= rpc
->srpc_scd
->scd_svc
;
1337 LASSERT(rpc
->srpc_bulk
!= NULL
);
1338 LASSERT(sv
->sv_id
== SRPC_SERVICE_TEST
);
1339 LASSERT(sfw_data
.fw_active_srpc
== NULL
);
1340 LASSERT(rpc
->srpc_reqstbuf
->buf_msg
.msg_body
.tes_reqst
.tsr_is_client
);
1342 spin_lock(&sfw_data
.fw_lock
);
1345 CERROR("Bulk transfer failed for RPC: service %s, peer %s, status %d\n",
1346 sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
), status
);
1347 spin_unlock(&sfw_data
.fw_lock
);
1351 if (sfw_data
.fw_shuttingdown
) {
1352 spin_unlock(&sfw_data
.fw_lock
);
1356 if (sfw_del_session_timer() != 0) {
1357 CERROR("Dropping RPC (%s) from %s: racing with expiry timer",
1358 sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
));
1359 spin_unlock(&sfw_data
.fw_lock
);
1363 sfw_data
.fw_active_srpc
= rpc
;
1364 spin_unlock(&sfw_data
.fw_lock
);
1366 rc
= sfw_add_test(rpc
);
1368 spin_lock(&sfw_data
.fw_lock
);
1370 if (!sfw_data
.fw_shuttingdown
)
1371 sfw_add_session_timer();
1373 sfw_data
.fw_active_srpc
= NULL
;
1374 spin_unlock(&sfw_data
.fw_lock
);
1379 sfw_create_rpc(lnet_process_id_t peer
, int service
,
1380 unsigned features
, int nbulkiov
, int bulklen
,
1381 void (*done
)(srpc_client_rpc_t
*), void *priv
)
1383 srpc_client_rpc_t
*rpc
= NULL
;
1385 spin_lock(&sfw_data
.fw_lock
);
1387 LASSERT(!sfw_data
.fw_shuttingdown
);
1388 LASSERT(service
<= SRPC_FRAMEWORK_SERVICE_MAX_ID
);
1390 if (nbulkiov
== 0 && !list_empty(&sfw_data
.fw_zombie_rpcs
)) {
1391 rpc
= list_entry(sfw_data
.fw_zombie_rpcs
.next
,
1392 srpc_client_rpc_t
, crpc_list
);
1393 list_del(&rpc
->crpc_list
);
1395 srpc_init_client_rpc(rpc
, peer
, service
, 0, 0,
1396 done
, sfw_client_rpc_fini
, priv
);
1399 spin_unlock(&sfw_data
.fw_lock
);
1402 rpc
= srpc_create_client_rpc(peer
, service
,
1403 nbulkiov
, bulklen
, done
,
1404 nbulkiov
!= 0 ? NULL
:
1405 sfw_client_rpc_fini
,
1409 if (rpc
!= NULL
) /* "session" is concept in framework */
1410 rpc
->crpc_reqstmsg
.msg_ses_feats
= features
;
1416 sfw_unpack_message(srpc_msg_t
*msg
)
1418 if (msg
->msg_magic
== SRPC_MSG_MAGIC
)
1419 return; /* no flipping needed */
1421 /* srpc module should guarantee I wouldn't get crap */
1422 LASSERT(msg
->msg_magic
== __swab32(SRPC_MSG_MAGIC
));
1424 if (msg
->msg_type
== SRPC_MSG_STAT_REQST
) {
1425 srpc_stat_reqst_t
*req
= &msg
->msg_body
.stat_reqst
;
1427 __swab32s(&req
->str_type
);
1428 __swab64s(&req
->str_rpyid
);
1429 sfw_unpack_sid(req
->str_sid
);
1433 if (msg
->msg_type
== SRPC_MSG_STAT_REPLY
) {
1434 srpc_stat_reply_t
*rep
= &msg
->msg_body
.stat_reply
;
1436 __swab32s(&rep
->str_status
);
1437 sfw_unpack_sid(rep
->str_sid
);
1438 sfw_unpack_fw_counters(rep
->str_fw
);
1439 sfw_unpack_rpc_counters(rep
->str_rpc
);
1440 sfw_unpack_lnet_counters(rep
->str_lnet
);
1444 if (msg
->msg_type
== SRPC_MSG_MKSN_REQST
) {
1445 srpc_mksn_reqst_t
*req
= &msg
->msg_body
.mksn_reqst
;
1447 __swab64s(&req
->mksn_rpyid
);
1448 __swab32s(&req
->mksn_force
);
1449 sfw_unpack_sid(req
->mksn_sid
);
1453 if (msg
->msg_type
== SRPC_MSG_MKSN_REPLY
) {
1454 srpc_mksn_reply_t
*rep
= &msg
->msg_body
.mksn_reply
;
1456 __swab32s(&rep
->mksn_status
);
1457 __swab32s(&rep
->mksn_timeout
);
1458 sfw_unpack_sid(rep
->mksn_sid
);
1462 if (msg
->msg_type
== SRPC_MSG_RMSN_REQST
) {
1463 srpc_rmsn_reqst_t
*req
= &msg
->msg_body
.rmsn_reqst
;
1465 __swab64s(&req
->rmsn_rpyid
);
1466 sfw_unpack_sid(req
->rmsn_sid
);
1470 if (msg
->msg_type
== SRPC_MSG_RMSN_REPLY
) {
1471 srpc_rmsn_reply_t
*rep
= &msg
->msg_body
.rmsn_reply
;
1473 __swab32s(&rep
->rmsn_status
);
1474 sfw_unpack_sid(rep
->rmsn_sid
);
1478 if (msg
->msg_type
== SRPC_MSG_DEBUG_REQST
) {
1479 srpc_debug_reqst_t
*req
= &msg
->msg_body
.dbg_reqst
;
1481 __swab64s(&req
->dbg_rpyid
);
1482 __swab32s(&req
->dbg_flags
);
1483 sfw_unpack_sid(req
->dbg_sid
);
1487 if (msg
->msg_type
== SRPC_MSG_DEBUG_REPLY
) {
1488 srpc_debug_reply_t
*rep
= &msg
->msg_body
.dbg_reply
;
1490 __swab32s(&rep
->dbg_nbatch
);
1491 __swab32s(&rep
->dbg_timeout
);
1492 sfw_unpack_sid(rep
->dbg_sid
);
1496 if (msg
->msg_type
== SRPC_MSG_BATCH_REQST
) {
1497 srpc_batch_reqst_t
*req
= &msg
->msg_body
.bat_reqst
;
1499 __swab32s(&req
->bar_opc
);
1500 __swab64s(&req
->bar_rpyid
);
1501 __swab32s(&req
->bar_testidx
);
1502 __swab32s(&req
->bar_arg
);
1503 sfw_unpack_sid(req
->bar_sid
);
1504 __swab64s(&req
->bar_bid
.bat_id
);
1508 if (msg
->msg_type
== SRPC_MSG_BATCH_REPLY
) {
1509 srpc_batch_reply_t
*rep
= &msg
->msg_body
.bat_reply
;
1511 __swab32s(&rep
->bar_status
);
1512 sfw_unpack_sid(rep
->bar_sid
);
1516 if (msg
->msg_type
== SRPC_MSG_TEST_REQST
) {
1517 srpc_test_reqst_t
*req
= &msg
->msg_body
.tes_reqst
;
1519 __swab64s(&req
->tsr_rpyid
);
1520 __swab64s(&req
->tsr_bulkid
);
1521 __swab32s(&req
->tsr_loop
);
1522 __swab32s(&req
->tsr_ndest
);
1523 __swab32s(&req
->tsr_concur
);
1524 __swab32s(&req
->tsr_service
);
1525 sfw_unpack_sid(req
->tsr_sid
);
1526 __swab64s(&req
->tsr_bid
.bat_id
);
1530 if (msg
->msg_type
== SRPC_MSG_TEST_REPLY
) {
1531 srpc_test_reply_t
*rep
= &msg
->msg_body
.tes_reply
;
1533 __swab32s(&rep
->tsr_status
);
1534 sfw_unpack_sid(rep
->tsr_sid
);
1538 if (msg
->msg_type
== SRPC_MSG_JOIN_REQST
) {
1539 srpc_join_reqst_t
*req
= &msg
->msg_body
.join_reqst
;
1541 __swab64s(&req
->join_rpyid
);
1542 sfw_unpack_sid(req
->join_sid
);
1546 if (msg
->msg_type
== SRPC_MSG_JOIN_REPLY
) {
1547 srpc_join_reply_t
*rep
= &msg
->msg_body
.join_reply
;
1549 __swab32s(&rep
->join_status
);
1550 __swab32s(&rep
->join_timeout
);
1551 sfw_unpack_sid(rep
->join_sid
);
1560 sfw_abort_rpc(srpc_client_rpc_t
*rpc
)
1562 LASSERT(atomic_read(&rpc
->crpc_refcount
) > 0);
1563 LASSERT(rpc
->crpc_service
<= SRPC_FRAMEWORK_SERVICE_MAX_ID
);
1565 spin_lock(&rpc
->crpc_lock
);
1566 srpc_abort_rpc(rpc
, -EINTR
);
1567 spin_unlock(&rpc
->crpc_lock
);
1572 sfw_post_rpc(srpc_client_rpc_t
*rpc
)
1574 spin_lock(&rpc
->crpc_lock
);
1576 LASSERT(!rpc
->crpc_closed
);
1577 LASSERT(!rpc
->crpc_aborted
);
1578 LASSERT(list_empty(&rpc
->crpc_list
));
1579 LASSERT(!sfw_data
.fw_shuttingdown
);
1581 rpc
->crpc_timeout
= rpc_timeout
;
1584 spin_unlock(&rpc
->crpc_lock
);
1588 static srpc_service_t sfw_services
[] = {
1590 /* sv_id */ SRPC_SERVICE_DEBUG
,
1591 /* sv_name */ "debug",
1595 /* sv_id */ SRPC_SERVICE_QUERY_STAT
,
1596 /* sv_name */ "query stats",
1600 /* sv_id */ SRPC_SERVICE_MAKE_SESSION
,
1601 /* sv_name */ "make session",
1605 /* sv_id */ SRPC_SERVICE_REMOVE_SESSION
,
1606 /* sv_name */ "remove session",
1610 /* sv_id */ SRPC_SERVICE_BATCH
,
1611 /* sv_name */ "batch service",
1615 /* sv_id */ SRPC_SERVICE_TEST
,
1616 /* sv_name */ "test service",
1626 extern sfw_test_client_ops_t ping_test_client
;
1627 extern srpc_service_t ping_test_service
;
1628 extern void ping_init_test_client(void);
1629 extern void ping_init_test_service(void);
1631 extern sfw_test_client_ops_t brw_test_client
;
1632 extern srpc_service_t brw_test_service
;
1633 extern void brw_init_test_client(void);
1634 extern void brw_init_test_service(void);
1644 sfw_test_case_t
*tsc
;
1647 if (session_timeout
< 0) {
1648 CERROR("Session timeout must be non-negative: %d\n",
1653 if (rpc_timeout
< 0) {
1654 CERROR("RPC timeout must be non-negative: %d\n",
1659 if (session_timeout
== 0)
1660 CWARN("Zero session_timeout specified - test sessions never expire.\n");
1662 if (rpc_timeout
== 0)
1663 CWARN("Zero rpc_timeout specified - test RPC never expire.\n");
1665 memset(&sfw_data
, 0, sizeof(struct smoketest_framework
));
1667 sfw_data
.fw_session
= NULL
;
1668 sfw_data
.fw_active_srpc
= NULL
;
1669 spin_lock_init(&sfw_data
.fw_lock
);
1670 atomic_set(&sfw_data
.fw_nzombies
, 0);
1671 INIT_LIST_HEAD(&sfw_data
.fw_tests
);
1672 INIT_LIST_HEAD(&sfw_data
.fw_zombie_rpcs
);
1673 INIT_LIST_HEAD(&sfw_data
.fw_zombie_sessions
);
1675 brw_init_test_client();
1676 brw_init_test_service();
1677 rc
= sfw_register_test(&brw_test_service
, &brw_test_client
);
1680 ping_init_test_client();
1681 ping_init_test_service();
1682 rc
= sfw_register_test(&ping_test_service
, &ping_test_client
);
1686 list_for_each_entry(tsc
, &sfw_data
.fw_tests
, tsc_list
) {
1687 sv
= tsc
->tsc_srv_service
;
1689 rc
= srpc_add_service(sv
);
1690 LASSERT(rc
!= -EBUSY
);
1692 CWARN("Failed to add %s service: %d\n",
1698 for (i
= 0; ; i
++) {
1699 sv
= &sfw_services
[i
];
1700 if (sv
->sv_name
== NULL
)
1703 sv
->sv_bulk_ready
= NULL
;
1704 sv
->sv_handler
= sfw_handle_server_rpc
;
1705 sv
->sv_wi_total
= SFW_FRWK_WI_MAX
;
1706 if (sv
->sv_id
== SRPC_SERVICE_TEST
)
1707 sv
->sv_bulk_ready
= sfw_bulk_ready
;
1709 rc
= srpc_add_service(sv
);
1710 LASSERT(rc
!= -EBUSY
);
1712 CWARN("Failed to add %s service: %d\n",
1717 /* about to sfw_shutdown, no need to add buffer */
1721 rc
= srpc_service_add_buffers(sv
, sv
->sv_wi_total
);
1723 CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
1724 sv
->sv_name
, sv
->sv_wi_total
, rc
);
1738 sfw_test_case_t
*tsc
;
1741 spin_lock(&sfw_data
.fw_lock
);
1743 sfw_data
.fw_shuttingdown
= 1;
1744 lst_wait_until(sfw_data
.fw_active_srpc
== NULL
, sfw_data
.fw_lock
,
1745 "waiting for active RPC to finish.\n");
1747 if (sfw_del_session_timer() != 0)
1748 lst_wait_until(sfw_data
.fw_session
== NULL
, sfw_data
.fw_lock
,
1749 "waiting for session timer to explode.\n");
1751 sfw_deactivate_session();
1752 lst_wait_until(atomic_read(&sfw_data
.fw_nzombies
) == 0,
1754 "waiting for %d zombie sessions to die.\n",
1755 atomic_read(&sfw_data
.fw_nzombies
));
1757 spin_unlock(&sfw_data
.fw_lock
);
1759 for (i
= 0; ; i
++) {
1760 sv
= &sfw_services
[i
];
1761 if (sv
->sv_name
== NULL
)
1764 srpc_shutdown_service(sv
);
1765 srpc_remove_service(sv
);
1768 list_for_each_entry(tsc
, &sfw_data
.fw_tests
, tsc_list
) {
1769 sv
= tsc
->tsc_srv_service
;
1770 srpc_shutdown_service(sv
);
1771 srpc_remove_service(sv
);
1774 while (!list_empty(&sfw_data
.fw_zombie_rpcs
)) {
1775 srpc_client_rpc_t
*rpc
;
1777 rpc
= list_entry(sfw_data
.fw_zombie_rpcs
.next
,
1778 srpc_client_rpc_t
, crpc_list
);
1779 list_del(&rpc
->crpc_list
);
1781 LIBCFS_FREE(rpc
, srpc_client_rpc_size(rpc
));
1784 for (i
= 0; ; i
++) {
1785 sv
= &sfw_services
[i
];
1786 if (sv
->sv_name
== NULL
)
1789 srpc_wait_service_shutdown(sv
);
1792 while (!list_empty(&sfw_data
.fw_tests
)) {
1793 tsc
= list_entry(sfw_data
.fw_tests
.next
,
1794 sfw_test_case_t
, tsc_list
);
1796 srpc_wait_service_shutdown(tsc
->tsc_srv_service
);
1798 list_del(&tsc
->tsc_list
);
1799 LIBCFS_FREE(tsc
, sizeof(*tsc
));