4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Message decoding, parsing and finalizing routines
41 #define DEBUG_SUBSYSTEM S_LNET
43 #include "../../include/linux/lnet/lib-lnet.h"
46 lnet_build_unlink_event(lnet_libmd_t
*md
, lnet_event_t
*ev
)
48 memset(ev
, 0, sizeof(*ev
));
52 ev
->type
= LNET_EVENT_UNLINK
;
53 lnet_md_deconstruct(md
, &ev
->md
);
54 lnet_md2handle(&ev
->md_handle
, md
);
58 * Don't need any lock, must be called after lnet_commit_md
61 lnet_build_msg_event(lnet_msg_t
*msg
, lnet_event_kind_t ev_type
)
63 lnet_hdr_t
*hdr
= &msg
->msg_hdr
;
64 lnet_event_t
*ev
= &msg
->msg_ev
;
66 LASSERT(!msg
->msg_routing
);
70 if (ev_type
== LNET_EVENT_SEND
) {
71 /* event for active message */
72 ev
->target
.nid
= le64_to_cpu(hdr
->dest_nid
);
73 ev
->target
.pid
= le32_to_cpu(hdr
->dest_pid
);
74 ev
->initiator
.nid
= LNET_NID_ANY
;
75 ev
->initiator
.pid
= the_lnet
.ln_pid
;
76 ev
->sender
= LNET_NID_ANY
;
79 /* event for passive message */
80 ev
->target
.pid
= hdr
->dest_pid
;
81 ev
->target
.nid
= hdr
->dest_nid
;
82 ev
->initiator
.pid
= hdr
->src_pid
;
83 ev
->initiator
.nid
= hdr
->src_nid
;
84 ev
->rlength
= hdr
->payload_length
;
85 ev
->sender
= msg
->msg_from
;
86 ev
->mlength
= msg
->msg_wanted
;
87 ev
->offset
= msg
->msg_offset
;
94 case LNET_EVENT_PUT
: /* passive PUT */
95 ev
->pt_index
= hdr
->msg
.put
.ptl_index
;
96 ev
->match_bits
= hdr
->msg
.put
.match_bits
;
97 ev
->hdr_data
= hdr
->msg
.put
.hdr_data
;
100 case LNET_EVENT_GET
: /* passive GET */
101 ev
->pt_index
= hdr
->msg
.get
.ptl_index
;
102 ev
->match_bits
= hdr
->msg
.get
.match_bits
;
106 case LNET_EVENT_ACK
: /* ACK */
107 ev
->match_bits
= hdr
->msg
.ack
.match_bits
;
108 ev
->mlength
= hdr
->msg
.ack
.mlength
;
111 case LNET_EVENT_REPLY
: /* REPLY */
114 case LNET_EVENT_SEND
: /* active message */
115 if (msg
->msg_type
== LNET_MSG_PUT
) {
116 ev
->pt_index
= le32_to_cpu(hdr
->msg
.put
.ptl_index
);
117 ev
->match_bits
= le64_to_cpu(hdr
->msg
.put
.match_bits
);
118 ev
->offset
= le32_to_cpu(hdr
->msg
.put
.offset
);
120 ev
->rlength
= le32_to_cpu(hdr
->payload_length
);
121 ev
->hdr_data
= le64_to_cpu(hdr
->msg
.put
.hdr_data
);
124 LASSERT(msg
->msg_type
== LNET_MSG_GET
);
125 ev
->pt_index
= le32_to_cpu(hdr
->msg
.get
.ptl_index
);
126 ev
->match_bits
= le64_to_cpu(hdr
->msg
.get
.match_bits
);
128 ev
->rlength
= le32_to_cpu(hdr
->msg
.get
.sink_length
);
129 ev
->offset
= le32_to_cpu(hdr
->msg
.get
.src_offset
);
137 lnet_msg_commit(lnet_msg_t
*msg
, int cpt
)
139 struct lnet_msg_container
*container
= the_lnet
.ln_msg_containers
[cpt
];
140 lnet_counters_t
*counters
= the_lnet
.ln_counters
[cpt
];
142 /* routed message can be committed for both receiving and sending */
143 LASSERT(!msg
->msg_tx_committed
);
145 if (msg
->msg_sending
) {
146 LASSERT(!msg
->msg_receiving
);
148 msg
->msg_tx_cpt
= cpt
;
149 msg
->msg_tx_committed
= 1;
150 if (msg
->msg_rx_committed
) { /* routed message REPLY */
151 LASSERT(msg
->msg_onactivelist
);
155 LASSERT(!msg
->msg_sending
);
156 msg
->msg_rx_cpt
= cpt
;
157 msg
->msg_rx_committed
= 1;
160 LASSERT(!msg
->msg_onactivelist
);
161 msg
->msg_onactivelist
= 1;
162 list_add(&msg
->msg_activelist
, &container
->msc_active
);
164 counters
->msgs_alloc
++;
165 if (counters
->msgs_alloc
> counters
->msgs_max
)
166 counters
->msgs_max
= counters
->msgs_alloc
;
170 lnet_msg_decommit_tx(lnet_msg_t
*msg
, int status
)
172 lnet_counters_t
*counters
;
173 lnet_event_t
*ev
= &msg
->msg_ev
;
175 LASSERT(msg
->msg_tx_committed
);
179 counters
= the_lnet
.ln_counters
[msg
->msg_tx_cpt
];
181 default: /* routed message */
182 LASSERT(msg
->msg_routing
);
183 LASSERT(msg
->msg_rx_committed
);
184 LASSERT(ev
->type
== 0);
186 counters
->route_length
+= msg
->msg_len
;
187 counters
->route_count
++;
191 /* should have been decommitted */
192 LASSERT(!msg
->msg_rx_committed
);
193 /* overwritten while sending ACK */
194 LASSERT(msg
->msg_type
== LNET_MSG_ACK
);
195 msg
->msg_type
= LNET_MSG_PUT
; /* fix type */
198 case LNET_EVENT_SEND
:
199 LASSERT(!msg
->msg_rx_committed
);
200 if (msg
->msg_type
== LNET_MSG_PUT
)
201 counters
->send_length
+= msg
->msg_len
;
205 LASSERT(msg
->msg_rx_committed
);
206 /* overwritten while sending reply, we should never be
207 * here for optimized GET */
208 LASSERT(msg
->msg_type
== LNET_MSG_REPLY
);
209 msg
->msg_type
= LNET_MSG_GET
; /* fix type */
213 counters
->send_count
++;
215 lnet_return_tx_credits_locked(msg
);
216 msg
->msg_tx_committed
= 0;
220 lnet_msg_decommit_rx(lnet_msg_t
*msg
, int status
)
222 lnet_counters_t
*counters
;
223 lnet_event_t
*ev
= &msg
->msg_ev
;
225 LASSERT(!msg
->msg_tx_committed
); /* decommitted or never committed */
226 LASSERT(msg
->msg_rx_committed
);
231 counters
= the_lnet
.ln_counters
[msg
->msg_rx_cpt
];
234 LASSERT(ev
->type
== 0);
235 LASSERT(msg
->msg_routing
);
239 LASSERT(msg
->msg_type
== LNET_MSG_ACK
);
243 /* type is "REPLY" if it's an optimized GET on passive side,
244 * because optimized GET will never be committed for sending,
245 * so message type wouldn't be changed back to "GET" by
246 * lnet_msg_decommit_tx(), see details in lnet_parse_get() */
247 LASSERT(msg
->msg_type
== LNET_MSG_REPLY
||
248 msg
->msg_type
== LNET_MSG_GET
);
249 counters
->send_length
+= msg
->msg_wanted
;
253 LASSERT(msg
->msg_type
== LNET_MSG_PUT
);
256 case LNET_EVENT_REPLY
:
257 /* type is "GET" if it's an optimized GET on active side,
258 * see details in lnet_create_reply_msg() */
259 LASSERT(msg
->msg_type
== LNET_MSG_GET
||
260 msg
->msg_type
== LNET_MSG_REPLY
);
264 counters
->recv_count
++;
265 if (ev
->type
== LNET_EVENT_PUT
|| ev
->type
== LNET_EVENT_REPLY
)
266 counters
->recv_length
+= msg
->msg_wanted
;
269 lnet_return_rx_credits_locked(msg
);
270 msg
->msg_rx_committed
= 0;
274 lnet_msg_decommit(lnet_msg_t
*msg
, int cpt
, int status
)
278 LASSERT(msg
->msg_tx_committed
|| msg
->msg_rx_committed
);
279 LASSERT(msg
->msg_onactivelist
);
281 if (msg
->msg_tx_committed
) { /* always decommit for sending first */
282 LASSERT(cpt
== msg
->msg_tx_cpt
);
283 lnet_msg_decommit_tx(msg
, status
);
286 if (msg
->msg_rx_committed
) {
287 /* forwarding msg committed for both receiving and sending */
288 if (cpt
!= msg
->msg_rx_cpt
) {
289 lnet_net_unlock(cpt
);
290 cpt2
= msg
->msg_rx_cpt
;
293 lnet_msg_decommit_rx(msg
, status
);
296 list_del(&msg
->msg_activelist
);
297 msg
->msg_onactivelist
= 0;
299 the_lnet
.ln_counters
[cpt2
]->msgs_alloc
--;
302 lnet_net_unlock(cpt2
);
308 lnet_msg_attach_md(lnet_msg_t
*msg
, lnet_libmd_t
*md
,
309 unsigned int offset
, unsigned int mlen
)
311 /* NB: @offset and @len are only useful for receiving */
312 /* Here, we attach the MD on lnet_msg and mark it busy and
313 * decrementing its threshold. Come what may, the lnet_msg "owns"
314 * the MD until a call to lnet_msg_detach_md or lnet_finalize()
315 * signals completion. */
316 LASSERT(!msg
->msg_routing
);
319 if (msg
->msg_receiving
) { /* committed for receiving */
320 msg
->msg_offset
= offset
;
321 msg
->msg_wanted
= mlen
;
325 if (md
->md_threshold
!= LNET_MD_THRESH_INF
) {
326 LASSERT(md
->md_threshold
> 0);
330 /* build umd in event */
331 lnet_md2handle(&msg
->msg_ev
.md_handle
, md
);
332 lnet_md_deconstruct(md
, &msg
->msg_ev
.md
);
336 lnet_msg_detach_md(lnet_msg_t
*msg
, int status
)
338 lnet_libmd_t
*md
= msg
->msg_md
;
341 /* Now it's safe to drop my caller's ref */
343 LASSERT(md
->md_refcount
>= 0);
345 unlink
= lnet_md_unlinkable(md
);
346 if (md
->md_eq
!= NULL
) {
347 msg
->msg_ev
.status
= status
;
348 msg
->msg_ev
.unlinked
= unlink
;
349 lnet_eq_enqueue_event(md
->md_eq
, &msg
->msg_ev
);
359 lnet_complete_msg_locked(lnet_msg_t
*msg
, int cpt
)
361 lnet_handle_wire_t ack_wmd
;
363 int status
= msg
->msg_ev
.status
;
365 LASSERT(msg
->msg_onactivelist
);
367 if (status
== 0 && msg
->msg_ack
) {
368 /* Only send an ACK if the PUT completed successfully */
370 lnet_msg_decommit(msg
, cpt
, 0);
373 lnet_net_unlock(cpt
);
375 LASSERT(msg
->msg_ev
.type
== LNET_EVENT_PUT
);
376 LASSERT(!msg
->msg_routing
);
378 ack_wmd
= msg
->msg_hdr
.msg
.put
.ack_wmd
;
380 lnet_prep_send(msg
, LNET_MSG_ACK
, msg
->msg_ev
.initiator
, 0, 0);
382 msg
->msg_hdr
.msg
.ack
.dst_wmd
= ack_wmd
;
383 msg
->msg_hdr
.msg
.ack
.match_bits
= msg
->msg_ev
.match_bits
;
384 msg
->msg_hdr
.msg
.ack
.mlength
= cpu_to_le32(msg
->msg_ev
.mlength
);
386 /* NB: we probably want to use NID of msg::msg_from as 3rd
387 * parameter (router NID) if it's routed message */
388 rc
= lnet_send(msg
->msg_ev
.target
.nid
, msg
, LNET_NID_ANY
);
392 * NB: message is committed for sending, we should return
393 * on success because LND will finalize this message later.
395 * Also, there is possibility that message is committed for
396 * sending and also failed before delivering to LND,
397 * i.e: ENOMEM, in that case we can't fall through either
398 * because CPT for sending can be different with CPT for
399 * receiving, so we should return back to lnet_finalize()
400 * to make sure we are locking the correct partition.
404 } else if (status
== 0 && /* OK so far */
405 (msg
->msg_routing
&& !msg
->msg_sending
)) {
407 LASSERT(!msg
->msg_receiving
); /* called back recv already */
408 lnet_net_unlock(cpt
);
410 rc
= lnet_send(LNET_NID_ANY
, msg
, LNET_NID_ANY
);
414 * NB: message is committed for sending, we should return
415 * on success because LND will finalize this message later.
417 * Also, there is possibility that message is committed for
418 * sending and also failed before delivering to LND,
419 * i.e: ENOMEM, in that case we can't fall through either:
420 * - The rule is message must decommit for sending first if
421 * the it's committed for both sending and receiving
422 * - CPT for sending can be different with CPT for receiving,
423 * so we should return back to lnet_finalize() to make
424 * sure we are locking the correct partition.
429 lnet_msg_decommit(msg
, cpt
, status
);
430 lnet_msg_free_locked(msg
);
435 lnet_finalize(lnet_ni_t
*ni
, lnet_msg_t
*msg
, int status
)
437 struct lnet_msg_container
*container
;
443 LASSERT(!in_interrupt());
448 CDEBUG(D_WARNING
, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
449 lnet_msgtyp2str(msg
->msg_type
), libcfs_id2str(msg
->msg_target
),
450 msg
->msg_target_is_router
? "t" : "",
451 msg
->msg_routing
? "X" : "",
452 msg
->msg_ack
? "A" : "",
453 msg
->msg_sending
? "S" : "",
454 msg
->msg_receiving
? "R" : "",
455 msg
->msg_delayed
? "d" : "",
456 msg
->msg_txcredit
? "C" : "",
457 msg
->msg_peertxcredit
? "c" : "",
458 msg
->msg_rtrcredit
? "F" : "",
459 msg
->msg_peerrtrcredit
? "f" : "",
460 msg
->msg_onactivelist
? "!" : "",
461 msg
->msg_txpeer
== NULL
? "<none>" : libcfs_nid2str(msg
->msg_txpeer
->lp_nid
),
462 msg
->msg_rxpeer
== NULL
? "<none>" : libcfs_nid2str(msg
->msg_rxpeer
->lp_nid
));
464 msg
->msg_ev
.status
= status
;
466 if (msg
->msg_md
!= NULL
) {
467 cpt
= lnet_cpt_of_cookie(msg
->msg_md
->md_lh
.lh_cookie
);
470 lnet_msg_detach_md(msg
, status
);
471 lnet_res_unlock(cpt
);
476 if (!msg
->msg_tx_committed
&& !msg
->msg_rx_committed
) {
477 /* not committed to network yet */
478 LASSERT(!msg
->msg_onactivelist
);
484 * NB: routed message can be committed for both receiving and sending,
485 * we should finalize in LIFO order and keep counters correct.
486 * (finalize sending first then finalize receiving)
488 cpt
= msg
->msg_tx_committed
? msg
->msg_tx_cpt
: msg
->msg_rx_cpt
;
491 container
= the_lnet
.ln_msg_containers
[cpt
];
492 list_add_tail(&msg
->msg_list
, &container
->msc_finalizing
);
494 /* Recursion breaker. Don't complete the message here if I am (or
495 * enough other threads are) already completing messages */
498 for (i
= 0; i
< container
->msc_nfinalizers
; i
++) {
499 if (container
->msc_finalizers
[i
] == current
)
502 if (my_slot
< 0 && container
->msc_finalizers
[i
] == NULL
)
506 if (i
< container
->msc_nfinalizers
|| my_slot
< 0) {
507 lnet_net_unlock(cpt
);
511 container
->msc_finalizers
[my_slot
] = current
;
513 while (!list_empty(&container
->msc_finalizing
)) {
514 msg
= list_entry(container
->msc_finalizing
.next
,
515 lnet_msg_t
, msg_list
);
517 list_del(&msg
->msg_list
);
519 /* NB drops and regains the lnet lock if it actually does
520 * anything, so my finalizing friends can chomp along too */
521 rc
= lnet_complete_msg_locked(msg
, cpt
);
526 container
->msc_finalizers
[my_slot
] = NULL
;
527 lnet_net_unlock(cpt
);
532 EXPORT_SYMBOL(lnet_finalize
);
535 lnet_msg_container_cleanup(struct lnet_msg_container
*container
)
539 if (container
->msc_init
== 0)
542 while (!list_empty(&container
->msc_active
)) {
543 lnet_msg_t
*msg
= list_entry(container
->msc_active
.next
,
544 lnet_msg_t
, msg_activelist
);
546 LASSERT(msg
->msg_onactivelist
);
547 msg
->msg_onactivelist
= 0;
548 list_del(&msg
->msg_activelist
);
554 CERROR("%d active msg on exit\n", count
);
556 if (container
->msc_finalizers
!= NULL
) {
557 LIBCFS_FREE(container
->msc_finalizers
,
558 container
->msc_nfinalizers
*
559 sizeof(*container
->msc_finalizers
));
560 container
->msc_finalizers
= NULL
;
562 #ifdef LNET_USE_LIB_FREELIST
563 lnet_freelist_fini(&container
->msc_freelist
);
565 container
->msc_init
= 0;
569 lnet_msg_container_setup(struct lnet_msg_container
*container
, int cpt
)
573 container
->msc_init
= 1;
575 INIT_LIST_HEAD(&container
->msc_active
);
576 INIT_LIST_HEAD(&container
->msc_finalizing
);
578 #ifdef LNET_USE_LIB_FREELIST
579 memset(&container
->msc_freelist
, 0, sizeof(lnet_freelist_t
));
581 rc
= lnet_freelist_init(&container
->msc_freelist
,
582 LNET_FL_MAX_MSGS
, sizeof(lnet_msg_t
));
584 CERROR("Failed to init freelist for message container\n");
585 lnet_msg_container_cleanup(container
);
592 container
->msc_nfinalizers
= cfs_cpt_weight(lnet_cpt_table(), cpt
);
594 LIBCFS_CPT_ALLOC(container
->msc_finalizers
, lnet_cpt_table(), cpt
,
595 container
->msc_nfinalizers
*
596 sizeof(*container
->msc_finalizers
));
598 if (container
->msc_finalizers
== NULL
) {
599 CERROR("Failed to allocate message finalizers\n");
600 lnet_msg_container_cleanup(container
);
608 lnet_msg_containers_destroy(void)
610 struct lnet_msg_container
*container
;
613 if (the_lnet
.ln_msg_containers
== NULL
)
616 cfs_percpt_for_each(container
, i
, the_lnet
.ln_msg_containers
)
617 lnet_msg_container_cleanup(container
);
619 cfs_percpt_free(the_lnet
.ln_msg_containers
);
620 the_lnet
.ln_msg_containers
= NULL
;
624 lnet_msg_containers_create(void)
626 struct lnet_msg_container
*container
;
630 the_lnet
.ln_msg_containers
= cfs_percpt_alloc(lnet_cpt_table(),
633 if (the_lnet
.ln_msg_containers
== NULL
) {
634 CERROR("Failed to allocate cpu-partition data for network\n");
638 cfs_percpt_for_each(container
, i
, the_lnet
.ln_msg_containers
) {
639 rc
= lnet_msg_container_setup(container
, i
);
641 lnet_msg_containers_destroy();