staging: lustre: Ignore hops if not explicitly set
[deliverable/linux.git] / drivers / staging / lustre / lnet / lnet / lib-move.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2015, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lnet/lnet/lib-move.c
37 *
38 * Data movement routines
39 */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42
43 #include "../../include/linux/lnet/lib-lnet.h"
44
45 /** lnet message has credit and can be submitted to lnd for send/receive */
46 #define LNET_CREDIT_OK 0
47 /** lnet message is waiting for credit */
48 #define LNET_CREDIT_WAIT 1
49
50 static int local_nid_dist_zero = 1;
51 module_param(local_nid_dist_zero, int, 0444);
52 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
53
54 int
55 lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
56 {
57 lnet_test_peer_t *tp;
58 struct list_head *el;
59 struct list_head *next;
60 struct list_head cull;
61
62 /* NB: use lnet_net_lock(0) to serialize operations on test peers */
63 if (threshold) {
64 /* Adding a new entry */
65 LIBCFS_ALLOC(tp, sizeof(*tp));
66 if (!tp)
67 return -ENOMEM;
68
69 tp->tp_nid = nid;
70 tp->tp_threshold = threshold;
71
72 lnet_net_lock(0);
73 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
74 lnet_net_unlock(0);
75 return 0;
76 }
77
78 /* removing entries */
79 INIT_LIST_HEAD(&cull);
80
81 lnet_net_lock(0);
82
83 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
84 tp = list_entry(el, lnet_test_peer_t, tp_list);
85
86 if (!tp->tp_threshold || /* needs culling anyway */
87 nid == LNET_NID_ANY || /* removing all entries */
88 tp->tp_nid == nid) { /* matched this one */
89 list_del(&tp->tp_list);
90 list_add(&tp->tp_list, &cull);
91 }
92 }
93
94 lnet_net_unlock(0);
95
96 while (!list_empty(&cull)) {
97 tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
98
99 list_del(&tp->tp_list);
100 LIBCFS_FREE(tp, sizeof(*tp));
101 }
102 return 0;
103 }
104
105 static int
106 fail_peer(lnet_nid_t nid, int outgoing)
107 {
108 lnet_test_peer_t *tp;
109 struct list_head *el;
110 struct list_head *next;
111 struct list_head cull;
112 int fail = 0;
113
114 INIT_LIST_HEAD(&cull);
115
116 /* NB: use lnet_net_lock(0) to serialize operations on test peers */
117 lnet_net_lock(0);
118
119 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
120 tp = list_entry(el, lnet_test_peer_t, tp_list);
121
122 if (!tp->tp_threshold) {
123 /* zombie entry */
124 if (outgoing) {
125 /*
126 * only cull zombies on outgoing tests,
127 * since we may be at interrupt priority on
128 * incoming messages.
129 */
130 list_del(&tp->tp_list);
131 list_add(&tp->tp_list, &cull);
132 }
133 continue;
134 }
135
136 if (tp->tp_nid == LNET_NID_ANY || /* fail every peer */
137 nid == tp->tp_nid) { /* fail this peer */
138 fail = 1;
139
140 if (tp->tp_threshold != LNET_MD_THRESH_INF) {
141 tp->tp_threshold--;
142 if (outgoing &&
143 !tp->tp_threshold) {
144 /* see above */
145 list_del(&tp->tp_list);
146 list_add(&tp->tp_list, &cull);
147 }
148 }
149 break;
150 }
151 }
152
153 lnet_net_unlock(0);
154
155 while (!list_empty(&cull)) {
156 tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
157 list_del(&tp->tp_list);
158
159 LIBCFS_FREE(tp, sizeof(*tp));
160 }
161
162 return fail;
163 }
164
165 unsigned int
166 lnet_iov_nob(unsigned int niov, struct kvec *iov)
167 {
168 unsigned int nob = 0;
169
170 while (niov-- > 0)
171 nob += (iov++)->iov_len;
172
173 return nob;
174 }
175 EXPORT_SYMBOL(lnet_iov_nob);
176
177 void
178 lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
179 unsigned int nsiov, struct kvec *siov, unsigned int soffset,
180 unsigned int nob)
181 {
182 /* NB diov, siov are READ-ONLY */
183 unsigned int this_nob;
184
185 if (!nob)
186 return;
187
188 /* skip complete frags before 'doffset' */
189 LASSERT(ndiov > 0);
190 while (doffset >= diov->iov_len) {
191 doffset -= diov->iov_len;
192 diov++;
193 ndiov--;
194 LASSERT(ndiov > 0);
195 }
196
197 /* skip complete frags before 'soffset' */
198 LASSERT(nsiov > 0);
199 while (soffset >= siov->iov_len) {
200 soffset -= siov->iov_len;
201 siov++;
202 nsiov--;
203 LASSERT(nsiov > 0);
204 }
205
206 do {
207 LASSERT(ndiov > 0);
208 LASSERT(nsiov > 0);
209 this_nob = min(diov->iov_len - doffset,
210 siov->iov_len - soffset);
211 this_nob = min(this_nob, nob);
212
213 memcpy((char *)diov->iov_base + doffset,
214 (char *)siov->iov_base + soffset, this_nob);
215 nob -= this_nob;
216
217 if (diov->iov_len > doffset + this_nob) {
218 doffset += this_nob;
219 } else {
220 diov++;
221 ndiov--;
222 doffset = 0;
223 }
224
225 if (siov->iov_len > soffset + this_nob) {
226 soffset += this_nob;
227 } else {
228 siov++;
229 nsiov--;
230 soffset = 0;
231 }
232 } while (nob > 0);
233 }
234 EXPORT_SYMBOL(lnet_copy_iov2iov);
235
236 int
237 lnet_extract_iov(int dst_niov, struct kvec *dst,
238 int src_niov, struct kvec *src,
239 unsigned int offset, unsigned int len)
240 {
241 /*
242 * Initialise 'dst' to the subset of 'src' starting at 'offset',
243 * for exactly 'len' bytes, and return the number of entries.
244 * NB not destructive to 'src'
245 */
246 unsigned int frag_len;
247 unsigned int niov;
248
249 if (!len) /* no data => */
250 return 0; /* no frags */
251
252 LASSERT(src_niov > 0);
253 while (offset >= src->iov_len) { /* skip initial frags */
254 offset -= src->iov_len;
255 src_niov--;
256 src++;
257 LASSERT(src_niov > 0);
258 }
259
260 niov = 1;
261 for (;;) {
262 LASSERT(src_niov > 0);
263 LASSERT((int)niov <= dst_niov);
264
265 frag_len = src->iov_len - offset;
266 dst->iov_base = ((char *)src->iov_base) + offset;
267
268 if (len <= frag_len) {
269 dst->iov_len = len;
270 return niov;
271 }
272
273 dst->iov_len = frag_len;
274
275 len -= frag_len;
276 dst++;
277 src++;
278 niov++;
279 src_niov--;
280 offset = 0;
281 }
282 }
283 EXPORT_SYMBOL(lnet_extract_iov);
284
285 unsigned int
286 lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
287 {
288 unsigned int nob = 0;
289
290 while (niov-- > 0)
291 nob += (kiov++)->kiov_len;
292
293 return nob;
294 }
295 EXPORT_SYMBOL(lnet_kiov_nob);
296
297 void
298 lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
299 unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
300 unsigned int nob)
301 {
302 /* NB diov, siov are READ-ONLY */
303 unsigned int this_nob;
304 char *daddr = NULL;
305 char *saddr = NULL;
306
307 if (!nob)
308 return;
309
310 LASSERT(!in_interrupt());
311
312 LASSERT(ndiov > 0);
313 while (doffset >= diov->kiov_len) {
314 doffset -= diov->kiov_len;
315 diov++;
316 ndiov--;
317 LASSERT(ndiov > 0);
318 }
319
320 LASSERT(nsiov > 0);
321 while (soffset >= siov->kiov_len) {
322 soffset -= siov->kiov_len;
323 siov++;
324 nsiov--;
325 LASSERT(nsiov > 0);
326 }
327
328 do {
329 LASSERT(ndiov > 0);
330 LASSERT(nsiov > 0);
331 this_nob = min(diov->kiov_len - doffset,
332 siov->kiov_len - soffset);
333 this_nob = min(this_nob, nob);
334
335 if (!daddr)
336 daddr = ((char *)kmap(diov->kiov_page)) +
337 diov->kiov_offset + doffset;
338 if (!saddr)
339 saddr = ((char *)kmap(siov->kiov_page)) +
340 siov->kiov_offset + soffset;
341
342 /*
343 * Vanishing risk of kmap deadlock when mapping 2 pages.
344 * However in practice at least one of the kiovs will be mapped
345 * kernel pages and the map/unmap will be NOOPs
346 */
347 memcpy(daddr, saddr, this_nob);
348 nob -= this_nob;
349
350 if (diov->kiov_len > doffset + this_nob) {
351 daddr += this_nob;
352 doffset += this_nob;
353 } else {
354 kunmap(diov->kiov_page);
355 daddr = NULL;
356 diov++;
357 ndiov--;
358 doffset = 0;
359 }
360
361 if (siov->kiov_len > soffset + this_nob) {
362 saddr += this_nob;
363 soffset += this_nob;
364 } else {
365 kunmap(siov->kiov_page);
366 saddr = NULL;
367 siov++;
368 nsiov--;
369 soffset = 0;
370 }
371 } while (nob > 0);
372
373 if (daddr)
374 kunmap(diov->kiov_page);
375 if (saddr)
376 kunmap(siov->kiov_page);
377 }
378 EXPORT_SYMBOL(lnet_copy_kiov2kiov);
379
380 void
381 lnet_copy_kiov2iov(unsigned int niov, struct kvec *iov, unsigned int iovoffset,
382 unsigned int nkiov, lnet_kiov_t *kiov,
383 unsigned int kiovoffset, unsigned int nob)
384 {
385 /* NB iov, kiov are READ-ONLY */
386 unsigned int this_nob;
387 char *addr = NULL;
388
389 if (!nob)
390 return;
391
392 LASSERT(!in_interrupt());
393
394 LASSERT(niov > 0);
395 while (iovoffset >= iov->iov_len) {
396 iovoffset -= iov->iov_len;
397 iov++;
398 niov--;
399 LASSERT(niov > 0);
400 }
401
402 LASSERT(nkiov > 0);
403 while (kiovoffset >= kiov->kiov_len) {
404 kiovoffset -= kiov->kiov_len;
405 kiov++;
406 nkiov--;
407 LASSERT(nkiov > 0);
408 }
409
410 do {
411 LASSERT(niov > 0);
412 LASSERT(nkiov > 0);
413 this_nob = min(iov->iov_len - iovoffset,
414 (__kernel_size_t) kiov->kiov_len - kiovoffset);
415 this_nob = min(this_nob, nob);
416
417 if (!addr)
418 addr = ((char *)kmap(kiov->kiov_page)) +
419 kiov->kiov_offset + kiovoffset;
420
421 memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
422 nob -= this_nob;
423
424 if (iov->iov_len > iovoffset + this_nob) {
425 iovoffset += this_nob;
426 } else {
427 iov++;
428 niov--;
429 iovoffset = 0;
430 }
431
432 if (kiov->kiov_len > kiovoffset + this_nob) {
433 addr += this_nob;
434 kiovoffset += this_nob;
435 } else {
436 kunmap(kiov->kiov_page);
437 addr = NULL;
438 kiov++;
439 nkiov--;
440 kiovoffset = 0;
441 }
442
443 } while (nob > 0);
444
445 if (addr)
446 kunmap(kiov->kiov_page);
447 }
448 EXPORT_SYMBOL(lnet_copy_kiov2iov);
449
450 void
451 lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov,
452 unsigned int kiovoffset, unsigned int niov,
453 struct kvec *iov, unsigned int iovoffset,
454 unsigned int nob)
455 {
456 /* NB kiov, iov are READ-ONLY */
457 unsigned int this_nob;
458 char *addr = NULL;
459
460 if (!nob)
461 return;
462
463 LASSERT(!in_interrupt());
464
465 LASSERT(nkiov > 0);
466 while (kiovoffset >= kiov->kiov_len) {
467 kiovoffset -= kiov->kiov_len;
468 kiov++;
469 nkiov--;
470 LASSERT(nkiov > 0);
471 }
472
473 LASSERT(niov > 0);
474 while (iovoffset >= iov->iov_len) {
475 iovoffset -= iov->iov_len;
476 iov++;
477 niov--;
478 LASSERT(niov > 0);
479 }
480
481 do {
482 LASSERT(nkiov > 0);
483 LASSERT(niov > 0);
484 this_nob = min((__kernel_size_t) kiov->kiov_len - kiovoffset,
485 iov->iov_len - iovoffset);
486 this_nob = min(this_nob, nob);
487
488 if (!addr)
489 addr = ((char *)kmap(kiov->kiov_page)) +
490 kiov->kiov_offset + kiovoffset;
491
492 memcpy(addr, (char *)iov->iov_base + iovoffset, this_nob);
493 nob -= this_nob;
494
495 if (kiov->kiov_len > kiovoffset + this_nob) {
496 addr += this_nob;
497 kiovoffset += this_nob;
498 } else {
499 kunmap(kiov->kiov_page);
500 addr = NULL;
501 kiov++;
502 nkiov--;
503 kiovoffset = 0;
504 }
505
506 if (iov->iov_len > iovoffset + this_nob) {
507 iovoffset += this_nob;
508 } else {
509 iov++;
510 niov--;
511 iovoffset = 0;
512 }
513 } while (nob > 0);
514
515 if (addr)
516 kunmap(kiov->kiov_page);
517 }
518 EXPORT_SYMBOL(lnet_copy_iov2kiov);
519
520 int
521 lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
522 int src_niov, lnet_kiov_t *src,
523 unsigned int offset, unsigned int len)
524 {
525 /*
526 * Initialise 'dst' to the subset of 'src' starting at 'offset',
527 * for exactly 'len' bytes, and return the number of entries.
528 * NB not destructive to 'src'
529 */
530 unsigned int frag_len;
531 unsigned int niov;
532
533 if (!len) /* no data => */
534 return 0; /* no frags */
535
536 LASSERT(src_niov > 0);
537 while (offset >= src->kiov_len) { /* skip initial frags */
538 offset -= src->kiov_len;
539 src_niov--;
540 src++;
541 LASSERT(src_niov > 0);
542 }
543
544 niov = 1;
545 for (;;) {
546 LASSERT(src_niov > 0);
547 LASSERT((int)niov <= dst_niov);
548
549 frag_len = src->kiov_len - offset;
550 dst->kiov_page = src->kiov_page;
551 dst->kiov_offset = src->kiov_offset + offset;
552
553 if (len <= frag_len) {
554 dst->kiov_len = len;
555 LASSERT(dst->kiov_offset + dst->kiov_len
556 <= PAGE_CACHE_SIZE);
557 return niov;
558 }
559
560 dst->kiov_len = frag_len;
561 LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_CACHE_SIZE);
562
563 len -= frag_len;
564 dst++;
565 src++;
566 niov++;
567 src_niov--;
568 offset = 0;
569 }
570 }
571 EXPORT_SYMBOL(lnet_extract_kiov);
572
573 static void
574 lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
575 unsigned int offset, unsigned int mlen, unsigned int rlen)
576 {
577 unsigned int niov = 0;
578 struct kvec *iov = NULL;
579 lnet_kiov_t *kiov = NULL;
580 int rc;
581
582 LASSERT(!in_interrupt());
583 LASSERT(!mlen || msg);
584
585 if (msg) {
586 LASSERT(msg->msg_receiving);
587 LASSERT(!msg->msg_sending);
588 LASSERT(rlen == msg->msg_len);
589 LASSERT(mlen <= msg->msg_len);
590 LASSERT(msg->msg_offset == offset);
591 LASSERT(msg->msg_wanted == mlen);
592
593 msg->msg_receiving = 0;
594
595 if (mlen) {
596 niov = msg->msg_niov;
597 iov = msg->msg_iov;
598 kiov = msg->msg_kiov;
599
600 LASSERT(niov > 0);
601 LASSERT(!iov != !kiov);
602 }
603 }
604
605 rc = ni->ni_lnd->lnd_recv(ni, private, msg, delayed,
606 niov, iov, kiov, offset, mlen, rlen);
607 if (rc < 0)
608 lnet_finalize(ni, msg, rc);
609 }
610
611 static void
612 lnet_setpayloadbuffer(lnet_msg_t *msg)
613 {
614 lnet_libmd_t *md = msg->msg_md;
615
616 LASSERT(msg->msg_len > 0);
617 LASSERT(!msg->msg_routing);
618 LASSERT(md);
619 LASSERT(!msg->msg_niov);
620 LASSERT(!msg->msg_iov);
621 LASSERT(!msg->msg_kiov);
622
623 msg->msg_niov = md->md_niov;
624 if (md->md_options & LNET_MD_KIOV)
625 msg->msg_kiov = md->md_iov.kiov;
626 else
627 msg->msg_iov = md->md_iov.iov;
628 }
629
630 void
631 lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
632 unsigned int offset, unsigned int len)
633 {
634 msg->msg_type = type;
635 msg->msg_target = target;
636 msg->msg_len = len;
637 msg->msg_offset = offset;
638
639 if (len)
640 lnet_setpayloadbuffer(msg);
641
642 memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
643 msg->msg_hdr.type = cpu_to_le32(type);
644 msg->msg_hdr.dest_nid = cpu_to_le64(target.nid);
645 msg->msg_hdr.dest_pid = cpu_to_le32(target.pid);
646 /* src_nid will be set later */
647 msg->msg_hdr.src_pid = cpu_to_le32(the_lnet.ln_pid);
648 msg->msg_hdr.payload_length = cpu_to_le32(len);
649 }
650
651 static void
652 lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
653 {
654 void *priv = msg->msg_private;
655 int rc;
656
657 LASSERT(!in_interrupt());
658 LASSERT(LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
659 (msg->msg_txcredit && msg->msg_peertxcredit));
660
661 rc = ni->ni_lnd->lnd_send(ni, priv, msg);
662 if (rc < 0)
663 lnet_finalize(ni, msg, rc);
664 }
665
666 static int
667 lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
668 {
669 int rc;
670
671 LASSERT(!msg->msg_sending);
672 LASSERT(msg->msg_receiving);
673 LASSERT(!msg->msg_rx_ready_delay);
674 LASSERT(ni->ni_lnd->lnd_eager_recv);
675
676 msg->msg_rx_ready_delay = 1;
677 rc = ni->ni_lnd->lnd_eager_recv(ni, msg->msg_private, msg,
678 &msg->msg_private);
679 if (rc) {
680 CERROR("recv from %s / send to %s aborted: eager_recv failed %d\n",
681 libcfs_nid2str(msg->msg_rxpeer->lp_nid),
682 libcfs_id2str(msg->msg_target), rc);
683 LASSERT(rc < 0); /* required by my callers */
684 }
685
686 return rc;
687 }
688
689 /* NB: caller shall hold a ref on 'lp' as I'd drop lnet_net_lock */
690 static void
691 lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
692 {
693 unsigned long last_alive = 0;
694
695 LASSERT(lnet_peer_aliveness_enabled(lp));
696 LASSERT(ni->ni_lnd->lnd_query);
697
698 lnet_net_unlock(lp->lp_cpt);
699 ni->ni_lnd->lnd_query(ni, lp->lp_nid, &last_alive);
700 lnet_net_lock(lp->lp_cpt);
701
702 lp->lp_last_query = cfs_time_current();
703
704 if (last_alive) /* NI has updated timestamp */
705 lp->lp_last_alive = last_alive;
706 }
707
708 /* NB: always called with lnet_net_lock held */
709 static inline int
710 lnet_peer_is_alive(lnet_peer_t *lp, unsigned long now)
711 {
712 int alive;
713 unsigned long deadline;
714
715 LASSERT(lnet_peer_aliveness_enabled(lp));
716
717 /* Trust lnet_notify() if it has more recent aliveness news, but
718 * ignore the initial assumed death (see lnet_peers_start_down()).
719 */
720 if (!lp->lp_alive && lp->lp_alive_count > 0 &&
721 cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
722 return 0;
723
724 deadline = cfs_time_add(lp->lp_last_alive,
725 cfs_time_seconds(lp->lp_ni->ni_peertimeout));
726 alive = cfs_time_after(deadline, now);
727
728 /* Update obsolete lp_alive except for routers assumed to be dead
729 * initially, because router checker would update aliveness in this
730 * case, and moreover lp_last_alive at peer creation is assumed.
731 */
732 if (alive && !lp->lp_alive &&
733 !(lnet_isrouter(lp) && !lp->lp_alive_count))
734 lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
735
736 return alive;
737 }
738
739 /*
740 * NB: returns 1 when alive, 0 when dead, negative when error;
741 * may drop the lnet_net_lock
742 */
743 static int
744 lnet_peer_alive_locked(lnet_peer_t *lp)
745 {
746 unsigned long now = cfs_time_current();
747
748 if (!lnet_peer_aliveness_enabled(lp))
749 return -ENODEV;
750
751 if (lnet_peer_is_alive(lp, now))
752 return 1;
753
754 /*
755 * Peer appears dead, but we should avoid frequent NI queries (at
756 * most once per lnet_queryinterval seconds).
757 */
758 if (lp->lp_last_query) {
759 static const int lnet_queryinterval = 1;
760
761 unsigned long next_query =
762 cfs_time_add(lp->lp_last_query,
763 cfs_time_seconds(lnet_queryinterval));
764
765 if (time_before(now, next_query)) {
766 if (lp->lp_alive)
767 CWARN("Unexpected aliveness of peer %s: %d < %d (%d/%d)\n",
768 libcfs_nid2str(lp->lp_nid),
769 (int)now, (int)next_query,
770 lnet_queryinterval,
771 lp->lp_ni->ni_peertimeout);
772 return 0;
773 }
774 }
775
776 /* query NI for latest aliveness news */
777 lnet_ni_query_locked(lp->lp_ni, lp);
778
779 if (lnet_peer_is_alive(lp, now))
780 return 1;
781
782 lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
783 return 0;
784 }
785
786 /**
787 * \param msg The message to be sent.
788 * \param do_send True if lnet_ni_send() should be called in this function.
789 * lnet_send() is going to lnet_net_unlock immediately after this, so
790 * it sets do_send FALSE and I don't do the unlock/send/lock bit.
791 *
792 * \retval LNET_CREDIT_OK If \a msg sent or OK to send.
793 * \retval LNET_CREDIT_WAIT If \a msg blocked for credit.
794 * \retval -EHOSTUNREACH If the next hop of the message appears dead.
795 * \retval -ECANCELED If the MD of the message has been unlinked.
796 */
797 static int
798 lnet_post_send_locked(lnet_msg_t *msg, int do_send)
799 {
800 lnet_peer_t *lp = msg->msg_txpeer;
801 lnet_ni_t *ni = lp->lp_ni;
802 int cpt = msg->msg_tx_cpt;
803 struct lnet_tx_queue *tq = ni->ni_tx_queues[cpt];
804
805 /* non-lnet_send() callers have checked before */
806 LASSERT(!do_send || msg->msg_tx_delayed);
807 LASSERT(!msg->msg_receiving);
808 LASSERT(msg->msg_tx_committed);
809
810 /* NB 'lp' is always the next hop */
811 if (!(msg->msg_target.pid & LNET_PID_USERFLAG) &&
812 !lnet_peer_alive_locked(lp)) {
813 the_lnet.ln_counters[cpt]->drop_count++;
814 the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
815 lnet_net_unlock(cpt);
816
817 CNETERR("Dropping message for %s: peer not alive\n",
818 libcfs_id2str(msg->msg_target));
819 if (do_send)
820 lnet_finalize(ni, msg, -EHOSTUNREACH);
821
822 lnet_net_lock(cpt);
823 return -EHOSTUNREACH;
824 }
825
826 if (msg->msg_md &&
827 (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED)) {
828 lnet_net_unlock(cpt);
829
830 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already called on the MD/ME.\n",
831 libcfs_id2str(msg->msg_target));
832 if (do_send)
833 lnet_finalize(ni, msg, -ECANCELED);
834
835 lnet_net_lock(cpt);
836 return -ECANCELED;
837 }
838
839 if (!msg->msg_peertxcredit) {
840 LASSERT((lp->lp_txcredits < 0) ==
841 !list_empty(&lp->lp_txq));
842
843 msg->msg_peertxcredit = 1;
844 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
845 lp->lp_txcredits--;
846
847 if (lp->lp_txcredits < lp->lp_mintxcredits)
848 lp->lp_mintxcredits = lp->lp_txcredits;
849
850 if (lp->lp_txcredits < 0) {
851 msg->msg_tx_delayed = 1;
852 list_add_tail(&msg->msg_list, &lp->lp_txq);
853 return LNET_CREDIT_WAIT;
854 }
855 }
856
857 if (!msg->msg_txcredit) {
858 LASSERT((tq->tq_credits < 0) ==
859 !list_empty(&tq->tq_delayed));
860
861 msg->msg_txcredit = 1;
862 tq->tq_credits--;
863
864 if (tq->tq_credits < tq->tq_credits_min)
865 tq->tq_credits_min = tq->tq_credits;
866
867 if (tq->tq_credits < 0) {
868 msg->msg_tx_delayed = 1;
869 list_add_tail(&msg->msg_list, &tq->tq_delayed);
870 return LNET_CREDIT_WAIT;
871 }
872 }
873
874 if (do_send) {
875 lnet_net_unlock(cpt);
876 lnet_ni_send(ni, msg);
877 lnet_net_lock(cpt);
878 }
879 return LNET_CREDIT_OK;
880 }
881
882 static lnet_rtrbufpool_t *
883 lnet_msg2bufpool(lnet_msg_t *msg)
884 {
885 lnet_rtrbufpool_t *rbp;
886 int cpt;
887
888 LASSERT(msg->msg_rx_committed);
889
890 cpt = msg->msg_rx_cpt;
891 rbp = &the_lnet.ln_rtrpools[cpt][0];
892
893 LASSERT(msg->msg_len <= LNET_MTU);
894 while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_CACHE_SIZE) {
895 rbp++;
896 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
897 }
898
899 return rbp;
900 }
901
902 static int
903 lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
904 {
905 /*
906 * lnet_parse is going to lnet_net_unlock immediately after this, so it
907 * sets do_recv FALSE and I don't do the unlock/send/lock bit.
908 * I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
909 * received or OK to receive
910 */
911 lnet_peer_t *lp = msg->msg_rxpeer;
912 lnet_rtrbufpool_t *rbp;
913 lnet_rtrbuf_t *rb;
914
915 LASSERT(!msg->msg_iov);
916 LASSERT(!msg->msg_kiov);
917 LASSERT(!msg->msg_niov);
918 LASSERT(msg->msg_routing);
919 LASSERT(msg->msg_receiving);
920 LASSERT(!msg->msg_sending);
921
922 /* non-lnet_parse callers only receive delayed messages */
923 LASSERT(!do_recv || msg->msg_rx_delayed);
924
925 if (!msg->msg_peerrtrcredit) {
926 LASSERT((lp->lp_rtrcredits < 0) ==
927 !list_empty(&lp->lp_rtrq));
928
929 msg->msg_peerrtrcredit = 1;
930 lp->lp_rtrcredits--;
931 if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
932 lp->lp_minrtrcredits = lp->lp_rtrcredits;
933
934 if (lp->lp_rtrcredits < 0) {
935 /* must have checked eager_recv before here */
936 LASSERT(msg->msg_rx_ready_delay);
937 msg->msg_rx_delayed = 1;
938 list_add_tail(&msg->msg_list, &lp->lp_rtrq);
939 return LNET_CREDIT_WAIT;
940 }
941 }
942
943 rbp = lnet_msg2bufpool(msg);
944
945 if (!msg->msg_rtrcredit) {
946 msg->msg_rtrcredit = 1;
947 rbp->rbp_credits--;
948 if (rbp->rbp_credits < rbp->rbp_mincredits)
949 rbp->rbp_mincredits = rbp->rbp_credits;
950
951 if (rbp->rbp_credits < 0) {
952 /* must have checked eager_recv before here */
953 LASSERT(msg->msg_rx_ready_delay);
954 msg->msg_rx_delayed = 1;
955 list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
956 return LNET_CREDIT_WAIT;
957 }
958 }
959
960 LASSERT(!list_empty(&rbp->rbp_bufs));
961 rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
962 list_del(&rb->rb_list);
963
964 msg->msg_niov = rbp->rbp_npages;
965 msg->msg_kiov = &rb->rb_kiov[0];
966
967 if (do_recv) {
968 int cpt = msg->msg_rx_cpt;
969
970 lnet_net_unlock(cpt);
971 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
972 0, msg->msg_len, msg->msg_len);
973 lnet_net_lock(cpt);
974 }
975 return LNET_CREDIT_OK;
976 }
977
978 void
979 lnet_return_tx_credits_locked(lnet_msg_t *msg)
980 {
981 lnet_peer_t *txpeer = msg->msg_txpeer;
982 lnet_msg_t *msg2;
983
984 if (msg->msg_txcredit) {
985 struct lnet_ni *ni = txpeer->lp_ni;
986 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
987
988 /* give back NI txcredits */
989 msg->msg_txcredit = 0;
990
991 LASSERT((tq->tq_credits < 0) ==
992 !list_empty(&tq->tq_delayed));
993
994 tq->tq_credits++;
995 if (tq->tq_credits <= 0) {
996 msg2 = list_entry(tq->tq_delayed.next,
997 lnet_msg_t, msg_list);
998 list_del(&msg2->msg_list);
999
1000 LASSERT(msg2->msg_txpeer->lp_ni == ni);
1001 LASSERT(msg2->msg_tx_delayed);
1002
1003 (void) lnet_post_send_locked(msg2, 1);
1004 }
1005 }
1006
1007 if (msg->msg_peertxcredit) {
1008 /* give back peer txcredits */
1009 msg->msg_peertxcredit = 0;
1010
1011 LASSERT((txpeer->lp_txcredits < 0) ==
1012 !list_empty(&txpeer->lp_txq));
1013
1014 txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
1015 LASSERT(txpeer->lp_txqnob >= 0);
1016
1017 txpeer->lp_txcredits++;
1018 if (txpeer->lp_txcredits <= 0) {
1019 msg2 = list_entry(txpeer->lp_txq.next,
1020 lnet_msg_t, msg_list);
1021 list_del(&msg2->msg_list);
1022
1023 LASSERT(msg2->msg_txpeer == txpeer);
1024 LASSERT(msg2->msg_tx_delayed);
1025
1026 (void) lnet_post_send_locked(msg2, 1);
1027 }
1028 }
1029
1030 if (txpeer) {
1031 msg->msg_txpeer = NULL;
1032 lnet_peer_decref_locked(txpeer);
1033 }
1034 }
1035
1036 void
1037 lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
1038 {
1039 lnet_msg_t *msg;
1040
1041 if (list_empty(&rbp->rbp_msgs))
1042 return;
1043 msg = list_entry(rbp->rbp_msgs.next,
1044 lnet_msg_t, msg_list);
1045 list_del(&msg->msg_list);
1046
1047 (void)lnet_post_routed_recv_locked(msg, 1);
1048 }
1049
1050 void
1051 lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
1052 {
1053 struct list_head drop;
1054 lnet_msg_t *msg;
1055 lnet_msg_t *tmp;
1056
1057 INIT_LIST_HEAD(&drop);
1058
1059 list_splice_init(list, &drop);
1060
1061 lnet_net_unlock(cpt);
1062
1063 list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
1064 lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
1065 0, 0, 0, msg->msg_hdr.payload_length);
1066 list_del_init(&msg->msg_list);
1067 lnet_finalize(NULL, msg, -ECANCELED);
1068 }
1069
1070 lnet_net_lock(cpt);
1071 }
1072
1073 void
1074 lnet_return_rx_credits_locked(lnet_msg_t *msg)
1075 {
1076 lnet_peer_t *rxpeer = msg->msg_rxpeer;
1077 lnet_msg_t *msg2;
1078
1079 if (msg->msg_rtrcredit) {
1080 /* give back global router credits */
1081 lnet_rtrbuf_t *rb;
1082 lnet_rtrbufpool_t *rbp;
1083
1084 /*
1085 * NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1086 * there until it gets one allocated, or aborts the wait
1087 * itself
1088 */
1089 LASSERT(msg->msg_kiov);
1090
1091 rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
1092 rbp = rb->rb_pool;
1093
1094 msg->msg_kiov = NULL;
1095 msg->msg_rtrcredit = 0;
1096
1097 LASSERT(rbp == lnet_msg2bufpool(msg));
1098
1099 LASSERT((rbp->rbp_credits > 0) ==
1100 !list_empty(&rbp->rbp_bufs));
1101
1102 /*
1103 * If routing is now turned off, we just drop this buffer and
1104 * don't bother trying to return credits.
1105 */
1106 if (!the_lnet.ln_routing) {
1107 lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1108 goto routing_off;
1109 }
1110
1111 /*
1112 * It is possible that a user has lowered the desired number of
1113 * buffers in this pool. Make sure we never put back
1114 * more buffers than the stated number.
1115 */
1116 if (unlikely(rbp->rbp_credits >= rbp->rbp_req_nbuffers)) {
1117 /* Discard this buffer so we don't have too many. */
1118 lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1119 rbp->rbp_nbuffers--;
1120 } else {
1121 list_add(&rb->rb_list, &rbp->rbp_bufs);
1122 rbp->rbp_credits++;
1123 if (rbp->rbp_credits <= 0)
1124 lnet_schedule_blocked_locked(rbp);
1125 }
1126 }
1127
1128 routing_off:
1129 if (msg->msg_peerrtrcredit) {
1130 /* give back peer router credits */
1131 msg->msg_peerrtrcredit = 0;
1132
1133 LASSERT((rxpeer->lp_rtrcredits < 0) ==
1134 !list_empty(&rxpeer->lp_rtrq));
1135
1136 rxpeer->lp_rtrcredits++;
1137 /*
1138 * drop all messages which are queued to be routed on that
1139 * peer.
1140 */
1141 if (!the_lnet.ln_routing) {
1142 lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
1143 msg->msg_rx_cpt);
1144 } else if (rxpeer->lp_rtrcredits <= 0) {
1145 msg2 = list_entry(rxpeer->lp_rtrq.next,
1146 lnet_msg_t, msg_list);
1147 list_del(&msg2->msg_list);
1148
1149 (void) lnet_post_routed_recv_locked(msg2, 1);
1150 }
1151 }
1152 if (rxpeer) {
1153 msg->msg_rxpeer = NULL;
1154 lnet_peer_decref_locked(rxpeer);
1155 }
1156 }
1157
1158 static int
1159 lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
1160 {
1161 lnet_peer_t *p1 = r1->lr_gateway;
1162 lnet_peer_t *p2 = r2->lr_gateway;
1163 int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
1164 int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
1165
1166 if (r1->lr_priority < r2->lr_priority)
1167 return 1;
1168
1169 if (r1->lr_priority > r2->lr_priority)
1170 return -1;
1171
1172 if (r1_hops < r2_hops)
1173 return 1;
1174
1175 if (r1_hops > r2_hops)
1176 return -1;
1177
1178 if (p1->lp_txqnob < p2->lp_txqnob)
1179 return 1;
1180
1181 if (p1->lp_txqnob > p2->lp_txqnob)
1182 return -1;
1183
1184 if (p1->lp_txcredits > p2->lp_txcredits)
1185 return 1;
1186
1187 if (p1->lp_txcredits < p2->lp_txcredits)
1188 return -1;
1189
1190 if (r1->lr_seq - r2->lr_seq <= 0)
1191 return 1;
1192
1193 return -1;
1194 }
1195
1196 static lnet_peer_t *
1197 lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
1198 {
1199 lnet_remotenet_t *rnet;
1200 lnet_route_t *route;
1201 lnet_route_t *best_route;
1202 lnet_route_t *last_route;
1203 struct lnet_peer *lp_best;
1204 struct lnet_peer *lp;
1205 int rc;
1206
1207 /*
1208 * If @rtr_nid is not LNET_NID_ANY, return the gateway with
1209 * rtr_nid nid, otherwise find the best gateway I can use
1210 */
1211 rnet = lnet_find_net_locked(LNET_NIDNET(target));
1212 if (!rnet)
1213 return NULL;
1214
1215 lp_best = NULL;
1216 best_route = NULL;
1217 last_route = NULL;
1218 list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
1219 lp = route->lr_gateway;
1220
1221 if (!lnet_is_route_alive(route))
1222 continue;
1223
1224 if (ni && lp->lp_ni != ni)
1225 continue;
1226
1227 if (lp->lp_nid == rtr_nid) /* it's pre-determined router */
1228 return lp;
1229
1230 if (!lp_best) {
1231 best_route = route;
1232 last_route = route;
1233 lp_best = lp;
1234 continue;
1235 }
1236
1237 /* no protection on below fields, but it's harmless */
1238 if (last_route->lr_seq - route->lr_seq < 0)
1239 last_route = route;
1240
1241 rc = lnet_compare_routes(route, best_route);
1242 if (rc < 0)
1243 continue;
1244
1245 best_route = route;
1246 lp_best = lp;
1247 }
1248
1249 /*
1250 * set sequence number on the best router to the latest sequence + 1
1251 * so we can round-robin all routers, it's race and inaccurate but
1252 * harmless and functional
1253 */
1254 if (best_route)
1255 best_route->lr_seq = last_route->lr_seq + 1;
1256 return lp_best;
1257 }
1258
1259 int
1260 lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
1261 {
1262 lnet_nid_t dst_nid = msg->msg_target.nid;
1263 struct lnet_ni *src_ni;
1264 struct lnet_ni *local_ni;
1265 struct lnet_peer *lp;
1266 int cpt;
1267 int cpt2;
1268 int rc;
1269
1270 /*
1271 * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
1272 * but we might want to use pre-determined router for ACK/REPLY
1273 * in the future
1274 */
1275 /* NB: ni == interface pre-determined (ACK/REPLY) */
1276 LASSERT(!msg->msg_txpeer);
1277 LASSERT(!msg->msg_sending);
1278 LASSERT(!msg->msg_target_is_router);
1279 LASSERT(!msg->msg_receiving);
1280
1281 msg->msg_sending = 1;
1282
1283 LASSERT(!msg->msg_tx_committed);
1284 cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
1285 again:
1286 lnet_net_lock(cpt);
1287
1288 if (the_lnet.ln_shutdown) {
1289 lnet_net_unlock(cpt);
1290 return -ESHUTDOWN;
1291 }
1292
1293 if (src_nid == LNET_NID_ANY) {
1294 src_ni = NULL;
1295 } else {
1296 src_ni = lnet_nid2ni_locked(src_nid, cpt);
1297 if (!src_ni) {
1298 lnet_net_unlock(cpt);
1299 LCONSOLE_WARN("Can't send to %s: src %s is not a local nid\n",
1300 libcfs_nid2str(dst_nid),
1301 libcfs_nid2str(src_nid));
1302 return -EINVAL;
1303 }
1304 LASSERT(!msg->msg_routing);
1305 }
1306
1307 /* Is this for someone on a local network? */
1308 local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
1309
1310 if (local_ni) {
1311 if (!src_ni) {
1312 src_ni = local_ni;
1313 src_nid = src_ni->ni_nid;
1314 } else if (src_ni == local_ni) {
1315 lnet_ni_decref_locked(local_ni, cpt);
1316 } else {
1317 lnet_ni_decref_locked(local_ni, cpt);
1318 lnet_ni_decref_locked(src_ni, cpt);
1319 lnet_net_unlock(cpt);
1320 LCONSOLE_WARN("No route to %s via from %s\n",
1321 libcfs_nid2str(dst_nid),
1322 libcfs_nid2str(src_nid));
1323 return -EINVAL;
1324 }
1325
1326 LASSERT(src_nid != LNET_NID_ANY);
1327 lnet_msg_commit(msg, cpt);
1328
1329 if (!msg->msg_routing)
1330 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1331
1332 if (src_ni == the_lnet.ln_loni) {
1333 /* No send credit hassles with LOLND */
1334 lnet_net_unlock(cpt);
1335 lnet_ni_send(src_ni, msg);
1336
1337 lnet_net_lock(cpt);
1338 lnet_ni_decref_locked(src_ni, cpt);
1339 lnet_net_unlock(cpt);
1340 return 0;
1341 }
1342
1343 rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
1344 /* lp has ref on src_ni; lose mine */
1345 lnet_ni_decref_locked(src_ni, cpt);
1346 if (rc) {
1347 lnet_net_unlock(cpt);
1348 LCONSOLE_WARN("Error %d finding peer %s\n", rc,
1349 libcfs_nid2str(dst_nid));
1350 /* ENOMEM or shutting down */
1351 return rc;
1352 }
1353 LASSERT(lp->lp_ni == src_ni);
1354 } else {
1355 /* sending to a remote network */
1356 lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
1357 if (!lp) {
1358 if (src_ni)
1359 lnet_ni_decref_locked(src_ni, cpt);
1360 lnet_net_unlock(cpt);
1361
1362 LCONSOLE_WARN("No route to %s via %s (all routers down)\n",
1363 libcfs_id2str(msg->msg_target),
1364 libcfs_nid2str(src_nid));
1365 return -EHOSTUNREACH;
1366 }
1367
1368 /*
1369 * rtr_nid is LNET_NID_ANY or NID of pre-determined router,
1370 * it's possible that rtr_nid isn't LNET_NID_ANY and lp isn't
1371 * pre-determined router, this can happen if router table
1372 * was changed when we release the lock
1373 */
1374 if (rtr_nid != lp->lp_nid) {
1375 cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
1376 if (cpt2 != cpt) {
1377 if (src_ni)
1378 lnet_ni_decref_locked(src_ni, cpt);
1379 lnet_net_unlock(cpt);
1380
1381 rtr_nid = lp->lp_nid;
1382 cpt = cpt2;
1383 goto again;
1384 }
1385 }
1386
1387 CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1388 libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
1389 lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1390
1391 if (!src_ni) {
1392 src_ni = lp->lp_ni;
1393 src_nid = src_ni->ni_nid;
1394 } else {
1395 LASSERT(src_ni == lp->lp_ni);
1396 lnet_ni_decref_locked(src_ni, cpt);
1397 }
1398
1399 lnet_peer_addref_locked(lp);
1400
1401 LASSERT(src_nid != LNET_NID_ANY);
1402 lnet_msg_commit(msg, cpt);
1403
1404 if (!msg->msg_routing) {
1405 /* I'm the source and now I know which NI to send on */
1406 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1407 }
1408
1409 msg->msg_target_is_router = 1;
1410 msg->msg_target.nid = lp->lp_nid;
1411 msg->msg_target.pid = LNET_PID_LUSTRE;
1412 }
1413
1414 /* 'lp' is our best choice of peer */
1415
1416 LASSERT(!msg->msg_peertxcredit);
1417 LASSERT(!msg->msg_txcredit);
1418 LASSERT(!msg->msg_txpeer);
1419
1420 msg->msg_txpeer = lp; /* msg takes my ref on lp */
1421
1422 rc = lnet_post_send_locked(msg, 0);
1423 lnet_net_unlock(cpt);
1424
1425 if (rc < 0)
1426 return rc;
1427
1428 if (rc == LNET_CREDIT_OK)
1429 lnet_ni_send(src_ni, msg);
1430
1431 return 0; /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */
1432 }
1433
1434 static void
1435 lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
1436 {
1437 lnet_net_lock(cpt);
1438 the_lnet.ln_counters[cpt]->drop_count++;
1439 the_lnet.ln_counters[cpt]->drop_length += nob;
1440 lnet_net_unlock(cpt);
1441
1442 lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1443 }
1444
1445 static void
1446 lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
1447 {
1448 lnet_hdr_t *hdr = &msg->msg_hdr;
1449
1450 if (msg->msg_wanted)
1451 lnet_setpayloadbuffer(msg);
1452
1453 lnet_build_msg_event(msg, LNET_EVENT_PUT);
1454
1455 /*
1456 * Must I ACK? If so I'll grab the ack_wmd out of the header and put
1457 * it back into the ACK during lnet_finalize()
1458 */
1459 msg->msg_ack = !lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1460 !(msg->msg_md->md_options & LNET_MD_ACK_DISABLE);
1461
1462 lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
1463 msg->msg_offset, msg->msg_wanted, hdr->payload_length);
1464 }
1465
1466 static int
1467 lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1468 {
1469 lnet_hdr_t *hdr = &msg->msg_hdr;
1470 struct lnet_match_info info;
1471 bool ready_delay;
1472 int rc;
1473
1474 /* Convert put fields to host byte order */
1475 hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1476 hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
1477 hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
1478
1479 info.mi_id.nid = hdr->src_nid;
1480 info.mi_id.pid = hdr->src_pid;
1481 info.mi_opc = LNET_MD_OP_PUT;
1482 info.mi_portal = hdr->msg.put.ptl_index;
1483 info.mi_rlength = hdr->payload_length;
1484 info.mi_roffset = hdr->msg.put.offset;
1485 info.mi_mbits = hdr->msg.put.match_bits;
1486
1487 msg->msg_rx_ready_delay = !ni->ni_lnd->lnd_eager_recv;
1488 ready_delay = msg->msg_rx_ready_delay;
1489
1490 again:
1491 rc = lnet_ptl_match_md(&info, msg);
1492 switch (rc) {
1493 default:
1494 LBUG();
1495
1496 case LNET_MATCHMD_OK:
1497 lnet_recv_put(ni, msg);
1498 return 0;
1499
1500 case LNET_MATCHMD_NONE:
1501 /**
1502 * no eager_recv or has already called it, should
1503 * have been attached on delayed list
1504 */
1505 if (ready_delay)
1506 return 0;
1507
1508 rc = lnet_ni_eager_recv(ni, msg);
1509 if (!rc) {
1510 ready_delay = true;
1511 goto again;
1512 }
1513 /* fall through */
1514
1515 case LNET_MATCHMD_DROP:
1516 CNETERR("Dropping PUT from %s portal %d match %llu offset %d length %d: %d\n",
1517 libcfs_id2str(info.mi_id), info.mi_portal,
1518 info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
1519
1520 return ENOENT; /* +ve: OK but no match */
1521 }
1522 }
1523
1524 static int
1525 lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1526 {
1527 struct lnet_match_info info;
1528 lnet_hdr_t *hdr = &msg->msg_hdr;
1529 lnet_handle_wire_t reply_wmd;
1530 int rc;
1531
1532 /* Convert get fields to host byte order */
1533 hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
1534 hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
1535 hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1536 hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
1537
1538 info.mi_id.nid = hdr->src_nid;
1539 info.mi_id.pid = hdr->src_pid;
1540 info.mi_opc = LNET_MD_OP_GET;
1541 info.mi_portal = hdr->msg.get.ptl_index;
1542 info.mi_rlength = hdr->msg.get.sink_length;
1543 info.mi_roffset = hdr->msg.get.src_offset;
1544 info.mi_mbits = hdr->msg.get.match_bits;
1545
1546 rc = lnet_ptl_match_md(&info, msg);
1547 if (rc == LNET_MATCHMD_DROP) {
1548 CNETERR("Dropping GET from %s portal %d match %llu offset %d length %d\n",
1549 libcfs_id2str(info.mi_id), info.mi_portal,
1550 info.mi_mbits, info.mi_roffset, info.mi_rlength);
1551 return ENOENT; /* +ve: OK but no match */
1552 }
1553
1554 LASSERT(rc == LNET_MATCHMD_OK);
1555
1556 lnet_build_msg_event(msg, LNET_EVENT_GET);
1557
1558 reply_wmd = hdr->msg.get.return_wmd;
1559
1560 lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
1561 msg->msg_offset, msg->msg_wanted);
1562
1563 msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1564
1565 if (rdma_get) {
1566 /* The LND completes the REPLY from her recv procedure */
1567 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1568 msg->msg_offset, msg->msg_len, msg->msg_len);
1569 return 0;
1570 }
1571
1572 lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1573 msg->msg_receiving = 0;
1574
1575 rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
1576 if (rc < 0) {
1577 /* didn't get as far as lnet_ni_send() */
1578 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1579 libcfs_nid2str(ni->ni_nid),
1580 libcfs_id2str(info.mi_id), rc);
1581
1582 lnet_finalize(ni, msg, rc);
1583 }
1584
1585 return 0;
1586 }
1587
1588 static int
1589 lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1590 {
1591 void *private = msg->msg_private;
1592 lnet_hdr_t *hdr = &msg->msg_hdr;
1593 lnet_process_id_t src = {0};
1594 lnet_libmd_t *md;
1595 int rlength;
1596 int mlength;
1597 int cpt;
1598
1599 cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
1600 lnet_res_lock(cpt);
1601
1602 src.nid = hdr->src_nid;
1603 src.pid = hdr->src_pid;
1604
1605 /* NB handles only looked up by creator (no flips) */
1606 md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1607 if (!md || !md->md_threshold || md->md_me) {
1608 CNETERR("%s: Dropping REPLY from %s for %s MD %#llx.%#llx\n",
1609 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1610 !md ? "invalid" : "inactive",
1611 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1612 hdr->msg.reply.dst_wmd.wh_object_cookie);
1613 if (md && md->md_me)
1614 CERROR("REPLY MD also attached to portal %d\n",
1615 md->md_me->me_portal);
1616
1617 lnet_res_unlock(cpt);
1618 return ENOENT; /* +ve: OK but no match */
1619 }
1620
1621 LASSERT(!md->md_offset);
1622
1623 rlength = hdr->payload_length;
1624 mlength = min_t(uint, rlength, md->md_length);
1625
1626 if (mlength < rlength &&
1627 !(md->md_options & LNET_MD_TRUNCATE)) {
1628 CNETERR("%s: Dropping REPLY from %s length %d for MD %#llx would overflow (%d)\n",
1629 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1630 rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1631 mlength);
1632 lnet_res_unlock(cpt);
1633 return ENOENT; /* +ve: OK but no match */
1634 }
1635
1636 CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
1637 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1638 mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1639
1640 lnet_msg_attach_md(msg, md, 0, mlength);
1641
1642 if (mlength)
1643 lnet_setpayloadbuffer(msg);
1644
1645 lnet_res_unlock(cpt);
1646
1647 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
1648
1649 lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1650 return 0;
1651 }
1652
1653 static int
1654 lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1655 {
1656 lnet_hdr_t *hdr = &msg->msg_hdr;
1657 lnet_process_id_t src = {0};
1658 lnet_libmd_t *md;
1659 int cpt;
1660
1661 src.nid = hdr->src_nid;
1662 src.pid = hdr->src_pid;
1663
1664 /* Convert ack fields to host byte order */
1665 hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1666 hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1667
1668 cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
1669 lnet_res_lock(cpt);
1670
1671 /* NB handles only looked up by creator (no flips) */
1672 md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1673 if (!md || !md->md_threshold || md->md_me) {
1674 /* Don't moan; this is expected */
1675 CDEBUG(D_NET,
1676 "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
1677 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1678 !md ? "invalid" : "inactive",
1679 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1680 hdr->msg.ack.dst_wmd.wh_object_cookie);
1681 if (md && md->md_me)
1682 CERROR("Source MD also attached to portal %d\n",
1683 md->md_me->me_portal);
1684
1685 lnet_res_unlock(cpt);
1686 return ENOENT; /* +ve! */
1687 }
1688
1689 CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
1690 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1691 hdr->msg.ack.dst_wmd.wh_object_cookie);
1692
1693 lnet_msg_attach_md(msg, md, 0, 0);
1694
1695 lnet_res_unlock(cpt);
1696
1697 lnet_build_msg_event(msg, LNET_EVENT_ACK);
1698
1699 lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1700 return 0;
1701 }
1702
1703 /**
1704 * \retval LNET_CREDIT_OK If \a msg is forwarded
1705 * \retval LNET_CREDIT_WAIT If \a msg is blocked because w/o buffer
1706 * \retval -ve error code
1707 */
1708 static int
1709 lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
1710 {
1711 int rc = 0;
1712
1713 if (!the_lnet.ln_routing)
1714 return -ECANCELED;
1715
1716 if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
1717 lnet_msg2bufpool(msg)->rbp_credits <= 0) {
1718 if (!ni->ni_lnd->lnd_eager_recv) {
1719 msg->msg_rx_ready_delay = 1;
1720 } else {
1721 lnet_net_unlock(msg->msg_rx_cpt);
1722 rc = lnet_ni_eager_recv(ni, msg);
1723 lnet_net_lock(msg->msg_rx_cpt);
1724 }
1725 }
1726
1727 if (!rc)
1728 rc = lnet_post_routed_recv_locked(msg, 0);
1729 return rc;
1730 }
1731
1732 char *
1733 lnet_msgtyp2str(int type)
1734 {
1735 switch (type) {
1736 case LNET_MSG_ACK:
1737 return "ACK";
1738 case LNET_MSG_PUT:
1739 return "PUT";
1740 case LNET_MSG_GET:
1741 return "GET";
1742 case LNET_MSG_REPLY:
1743 return "REPLY";
1744 case LNET_MSG_HELLO:
1745 return "HELLO";
1746 default:
1747 return "<UNKNOWN>";
1748 }
1749 }
1750
1751 void
1752 lnet_print_hdr(lnet_hdr_t *hdr)
1753 {
1754 lnet_process_id_t src = {0};
1755 lnet_process_id_t dst = {0};
1756 char *type_str = lnet_msgtyp2str(hdr->type);
1757
1758 src.nid = hdr->src_nid;
1759 src.pid = hdr->src_pid;
1760
1761 dst.nid = hdr->dest_nid;
1762 dst.pid = hdr->dest_pid;
1763
1764 CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1765 CWARN(" From %s\n", libcfs_id2str(src));
1766 CWARN(" To %s\n", libcfs_id2str(dst));
1767
1768 switch (hdr->type) {
1769 default:
1770 break;
1771
1772 case LNET_MSG_PUT:
1773 CWARN(" Ptl index %d, ack md %#llx.%#llx, match bits %llu\n",
1774 hdr->msg.put.ptl_index,
1775 hdr->msg.put.ack_wmd.wh_interface_cookie,
1776 hdr->msg.put.ack_wmd.wh_object_cookie,
1777 hdr->msg.put.match_bits);
1778 CWARN(" Length %d, offset %d, hdr data %#llx\n",
1779 hdr->payload_length, hdr->msg.put.offset,
1780 hdr->msg.put.hdr_data);
1781 break;
1782
1783 case LNET_MSG_GET:
1784 CWARN(" Ptl index %d, return md %#llx.%#llx, match bits %llu\n",
1785 hdr->msg.get.ptl_index,
1786 hdr->msg.get.return_wmd.wh_interface_cookie,
1787 hdr->msg.get.return_wmd.wh_object_cookie,
1788 hdr->msg.get.match_bits);
1789 CWARN(" Length %d, src offset %d\n",
1790 hdr->msg.get.sink_length,
1791 hdr->msg.get.src_offset);
1792 break;
1793
1794 case LNET_MSG_ACK:
1795 CWARN(" dst md %#llx.%#llx, manipulated length %d\n",
1796 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1797 hdr->msg.ack.dst_wmd.wh_object_cookie,
1798 hdr->msg.ack.mlength);
1799 break;
1800
1801 case LNET_MSG_REPLY:
1802 CWARN(" dst md %#llx.%#llx, length %d\n",
1803 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1804 hdr->msg.reply.dst_wmd.wh_object_cookie,
1805 hdr->payload_length);
1806 }
1807 }
1808
1809 int
1810 lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
1811 void *private, int rdma_req)
1812 {
1813 int rc = 0;
1814 int cpt;
1815 int for_me;
1816 struct lnet_msg *msg;
1817 lnet_pid_t dest_pid;
1818 lnet_nid_t dest_nid;
1819 lnet_nid_t src_nid;
1820 __u32 payload_length;
1821 __u32 type;
1822
1823 LASSERT(!in_interrupt());
1824
1825 type = le32_to_cpu(hdr->type);
1826 src_nid = le64_to_cpu(hdr->src_nid);
1827 dest_nid = le64_to_cpu(hdr->dest_nid);
1828 dest_pid = le32_to_cpu(hdr->dest_pid);
1829 payload_length = le32_to_cpu(hdr->payload_length);
1830
1831 for_me = (ni->ni_nid == dest_nid);
1832 cpt = lnet_cpt_of_nid(from_nid);
1833
1834 switch (type) {
1835 case LNET_MSG_ACK:
1836 case LNET_MSG_GET:
1837 if (payload_length > 0) {
1838 CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1839 libcfs_nid2str(from_nid),
1840 libcfs_nid2str(src_nid),
1841 lnet_msgtyp2str(type), payload_length);
1842 return -EPROTO;
1843 }
1844 break;
1845
1846 case LNET_MSG_PUT:
1847 case LNET_MSG_REPLY:
1848 if (payload_length >
1849 (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1850 CERROR("%s, src %s: bad %s payload %d (%d max expected)\n",
1851 libcfs_nid2str(from_nid),
1852 libcfs_nid2str(src_nid),
1853 lnet_msgtyp2str(type),
1854 payload_length,
1855 for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1856 return -EPROTO;
1857 }
1858 break;
1859
1860 default:
1861 CERROR("%s, src %s: Bad message type 0x%x\n",
1862 libcfs_nid2str(from_nid),
1863 libcfs_nid2str(src_nid), type);
1864 return -EPROTO;
1865 }
1866
1867 if (the_lnet.ln_routing &&
1868 ni->ni_last_alive != ktime_get_real_seconds()) {
1869 /* NB: so far here is the only place to set NI status to "up */
1870 lnet_ni_lock(ni);
1871 ni->ni_last_alive = ktime_get_real_seconds();
1872 if (ni->ni_status &&
1873 ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
1874 ni->ni_status->ns_status = LNET_NI_STATUS_UP;
1875 lnet_ni_unlock(ni);
1876 }
1877
1878 /*
1879 * Regard a bad destination NID as a protocol error. Senders should
1880 * know what they're doing; if they don't they're misconfigured, buggy
1881 * or malicious so we chop them off at the knees :)
1882 */
1883 if (!for_me) {
1884 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
1885 /* should have gone direct */
1886 CERROR("%s, src %s: Bad dest nid %s (should have been sent direct)\n",
1887 libcfs_nid2str(from_nid),
1888 libcfs_nid2str(src_nid),
1889 libcfs_nid2str(dest_nid));
1890 return -EPROTO;
1891 }
1892
1893 if (lnet_islocalnid(dest_nid)) {
1894 /*
1895 * dest is another local NI; sender should have used
1896 * this node's NID on its own network
1897 */
1898 CERROR("%s, src %s: Bad dest nid %s (it's my nid but on a different network)\n",
1899 libcfs_nid2str(from_nid),
1900 libcfs_nid2str(src_nid),
1901 libcfs_nid2str(dest_nid));
1902 return -EPROTO;
1903 }
1904
1905 if (rdma_req && type == LNET_MSG_GET) {
1906 CERROR("%s, src %s: Bad optimized GET for %s (final destination must be me)\n",
1907 libcfs_nid2str(from_nid),
1908 libcfs_nid2str(src_nid),
1909 libcfs_nid2str(dest_nid));
1910 return -EPROTO;
1911 }
1912
1913 if (!the_lnet.ln_routing) {
1914 CERROR("%s, src %s: Dropping message for %s (routing not enabled)\n",
1915 libcfs_nid2str(from_nid),
1916 libcfs_nid2str(src_nid),
1917 libcfs_nid2str(dest_nid));
1918 goto drop;
1919 }
1920 }
1921
1922 /*
1923 * Message looks OK; we're not going to return an error, so we MUST
1924 * call back lnd_recv() come what may...
1925 */
1926 if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
1927 fail_peer(src_nid, 0)) { /* shall we now? */
1928 CERROR("%s, src %s: Dropping %s to simulate failure\n",
1929 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1930 lnet_msgtyp2str(type));
1931 goto drop;
1932 }
1933
1934 msg = lnet_msg_alloc();
1935 if (!msg) {
1936 CERROR("%s, src %s: Dropping %s (out of memory)\n",
1937 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1938 lnet_msgtyp2str(type));
1939 goto drop;
1940 }
1941
1942 /* msg zeroed in lnet_msg_alloc;
1943 * i.e. flags all clear, pointers NULL etc
1944 */
1945 msg->msg_type = type;
1946 msg->msg_private = private;
1947 msg->msg_receiving = 1;
1948 msg->msg_wanted = payload_length;
1949 msg->msg_len = payload_length;
1950 msg->msg_offset = 0;
1951 msg->msg_hdr = *hdr;
1952 /* for building message event */
1953 msg->msg_from = from_nid;
1954 if (!for_me) {
1955 msg->msg_target.pid = dest_pid;
1956 msg->msg_target.nid = dest_nid;
1957 msg->msg_routing = 1;
1958
1959 } else {
1960 /* convert common msg->hdr fields to host byteorder */
1961 msg->msg_hdr.type = type;
1962 msg->msg_hdr.src_nid = src_nid;
1963 msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
1964 msg->msg_hdr.dest_nid = dest_nid;
1965 msg->msg_hdr.dest_pid = dest_pid;
1966 msg->msg_hdr.payload_length = payload_length;
1967 }
1968
1969 lnet_net_lock(cpt);
1970 rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
1971 if (rc) {
1972 lnet_net_unlock(cpt);
1973 CERROR("%s, src %s: Dropping %s (error %d looking up sender)\n",
1974 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1975 lnet_msgtyp2str(type), rc);
1976 lnet_msg_free(msg);
1977 goto drop;
1978 }
1979
1980 if (lnet_isrouter(msg->msg_rxpeer)) {
1981 lnet_peer_set_alive(msg->msg_rxpeer);
1982 if (avoid_asym_router_failure &&
1983 LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
1984 /* received a remote message from router, update
1985 * remote NI status on this router.
1986 * NB: multi-hop routed message will be ignored.
1987 */
1988 lnet_router_ni_update_locked(msg->msg_rxpeer,
1989 LNET_NIDNET(src_nid));
1990 }
1991 }
1992
1993 lnet_msg_commit(msg, cpt);
1994
1995 if (!for_me) {
1996 rc = lnet_parse_forward_locked(ni, msg);
1997 lnet_net_unlock(cpt);
1998
1999 if (rc < 0)
2000 goto free_drop;
2001
2002 if (rc == LNET_CREDIT_OK) {
2003 lnet_ni_recv(ni, msg->msg_private, msg, 0,
2004 0, payload_length, payload_length);
2005 }
2006 return 0;
2007 }
2008
2009 lnet_net_unlock(cpt);
2010
2011 switch (type) {
2012 case LNET_MSG_ACK:
2013 rc = lnet_parse_ack(ni, msg);
2014 break;
2015 case LNET_MSG_PUT:
2016 rc = lnet_parse_put(ni, msg);
2017 break;
2018 case LNET_MSG_GET:
2019 rc = lnet_parse_get(ni, msg, rdma_req);
2020 break;
2021 case LNET_MSG_REPLY:
2022 rc = lnet_parse_reply(ni, msg);
2023 break;
2024 default:
2025 LASSERT(0);
2026 rc = -EPROTO;
2027 goto free_drop; /* prevent an unused label if !kernel */
2028 }
2029
2030 if (!rc)
2031 return 0;
2032
2033 LASSERT(rc == ENOENT);
2034
2035 free_drop:
2036 LASSERT(!msg->msg_md);
2037 lnet_finalize(ni, msg, rc);
2038
2039 drop:
2040 lnet_drop_message(ni, cpt, private, payload_length);
2041 return 0;
2042 }
2043 EXPORT_SYMBOL(lnet_parse);
2044
2045 void
2046 lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
2047 {
2048 while (!list_empty(head)) {
2049 lnet_process_id_t id = {0};
2050 lnet_msg_t *msg;
2051
2052 msg = list_entry(head->next, lnet_msg_t, msg_list);
2053 list_del(&msg->msg_list);
2054
2055 id.nid = msg->msg_hdr.src_nid;
2056 id.pid = msg->msg_hdr.src_pid;
2057
2058 LASSERT(!msg->msg_md);
2059 LASSERT(msg->msg_rx_delayed);
2060 LASSERT(msg->msg_rxpeer);
2061 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
2062
2063 CWARN("Dropping delayed PUT from %s portal %d match %llu offset %d length %d: %s\n",
2064 libcfs_id2str(id),
2065 msg->msg_hdr.msg.put.ptl_index,
2066 msg->msg_hdr.msg.put.match_bits,
2067 msg->msg_hdr.msg.put.offset,
2068 msg->msg_hdr.payload_length, reason);
2069
2070 /*
2071 * NB I can't drop msg's ref on msg_rxpeer until after I've
2072 * called lnet_drop_message(), so I just hang onto msg as well
2073 * until that's done
2074 */
2075 lnet_drop_message(msg->msg_rxpeer->lp_ni,
2076 msg->msg_rxpeer->lp_cpt,
2077 msg->msg_private, msg->msg_len);
2078 /*
2079 * NB: message will not generate event because w/o attached MD,
2080 * but we still should give error code so lnet_msg_decommit()
2081 * can skip counters operations and other checks.
2082 */
2083 lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
2084 }
2085 }
2086
2087 void
2088 lnet_recv_delayed_msg_list(struct list_head *head)
2089 {
2090 while (!list_empty(head)) {
2091 lnet_msg_t *msg;
2092 lnet_process_id_t id;
2093
2094 msg = list_entry(head->next, lnet_msg_t, msg_list);
2095 list_del(&msg->msg_list);
2096
2097 /*
2098 * md won't disappear under me, since each msg
2099 * holds a ref on it
2100 */
2101 id.nid = msg->msg_hdr.src_nid;
2102 id.pid = msg->msg_hdr.src_pid;
2103
2104 LASSERT(msg->msg_rx_delayed);
2105 LASSERT(msg->msg_md);
2106 LASSERT(msg->msg_rxpeer);
2107 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
2108
2109 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
2110 libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
2111 msg->msg_hdr.msg.put.match_bits,
2112 msg->msg_hdr.msg.put.offset,
2113 msg->msg_hdr.payload_length);
2114
2115 lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
2116 }
2117 }
2118
2119 /**
2120 * Initiate an asynchronous PUT operation.
2121 *
2122 * There are several events associated with a PUT: completion of the send on
2123 * the initiator node (LNET_EVENT_SEND), and when the send completes
2124 * successfully, the receipt of an acknowledgment (LNET_EVENT_ACK) indicating
2125 * that the operation was accepted by the target. The event LNET_EVENT_PUT is
2126 * used at the target node to indicate the completion of incoming data
2127 * delivery.
2128 *
2129 * The local events will be logged in the EQ associated with the MD pointed to
2130 * by \a mdh handle. Using a MD without an associated EQ results in these
2131 * events being discarded. In this case, the caller must have another
2132 * mechanism (e.g., a higher level protocol) for determining when it is safe
2133 * to modify the memory region associated with the MD.
2134 *
2135 * Note that LNet does not guarantee the order of LNET_EVENT_SEND and
2136 * LNET_EVENT_ACK, though intuitively ACK should happen after SEND.
2137 *
2138 * \param self Indicates the NID of a local interface through which to send
2139 * the PUT request. Use LNET_NID_ANY to let LNet choose one by itself.
2140 * \param mdh A handle for the MD that describes the memory to be sent. The MD
2141 * must be "free floating" (See LNetMDBind()).
2142 * \param ack Controls whether an acknowledgment is requested.
2143 * Acknowledgments are only sent when they are requested by the initiating
2144 * process and the target MD enables them.
2145 * \param target A process identifier for the target process.
2146 * \param portal The index in the \a target's portal table.
2147 * \param match_bits The match bits to use for MD selection at the target
2148 * process.
2149 * \param offset The offset into the target MD (only used when the target
2150 * MD has the LNET_MD_MANAGE_REMOTE option set).
2151 * \param hdr_data 64 bits of user data that can be included in the message
2152 * header. This data is written to an event queue entry at the target if an
2153 * EQ is present on the matching MD.
2154 *
2155 * \retval 0 Success, and only in this case events will be generated
2156 * and logged to EQ (if it exists).
2157 * \retval -EIO Simulated failure.
2158 * \retval -ENOMEM Memory allocation failure.
2159 * \retval -ENOENT Invalid MD object.
2160 *
2161 * \see lnet_event_t::hdr_data and lnet_event_kind_t.
2162 */
2163 int
2164 LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
2165 lnet_process_id_t target, unsigned int portal,
2166 __u64 match_bits, unsigned int offset,
2167 __u64 hdr_data)
2168 {
2169 struct lnet_msg *msg;
2170 struct lnet_libmd *md;
2171 int cpt;
2172 int rc;
2173
2174 LASSERT(the_lnet.ln_refcount > 0);
2175
2176 if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
2177 fail_peer(target.nid, 1)) { /* shall we now? */
2178 CERROR("Dropping PUT to %s: simulated failure\n",
2179 libcfs_id2str(target));
2180 return -EIO;
2181 }
2182
2183 msg = lnet_msg_alloc();
2184 if (!msg) {
2185 CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2186 libcfs_id2str(target));
2187 return -ENOMEM;
2188 }
2189 msg->msg_vmflush = !!memory_pressure_get();
2190
2191 cpt = lnet_cpt_of_cookie(mdh.cookie);
2192 lnet_res_lock(cpt);
2193
2194 md = lnet_handle2md(&mdh);
2195 if (!md || !md->md_threshold || md->md_me) {
2196 CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
2197 match_bits, portal, libcfs_id2str(target),
2198 !md ? -1 : md->md_threshold);
2199 if (md && md->md_me)
2200 CERROR("Source MD also attached to portal %d\n",
2201 md->md_me->me_portal);
2202 lnet_res_unlock(cpt);
2203
2204 lnet_msg_free(msg);
2205 return -ENOENT;
2206 }
2207
2208 CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2209
2210 lnet_msg_attach_md(msg, md, 0, 0);
2211
2212 lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2213
2214 msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2215 msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2216 msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2217 msg->msg_hdr.msg.put.hdr_data = hdr_data;
2218
2219 /* NB handles only looked up by creator (no flips) */
2220 if (ack == LNET_ACK_REQ) {
2221 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2222 the_lnet.ln_interface_cookie;
2223 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2224 md->md_lh.lh_cookie;
2225 } else {
2226 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2227 LNET_WIRE_HANDLE_COOKIE_NONE;
2228 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2229 LNET_WIRE_HANDLE_COOKIE_NONE;
2230 }
2231
2232 lnet_res_unlock(cpt);
2233
2234 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2235
2236 rc = lnet_send(self, msg, LNET_NID_ANY);
2237 if (rc) {
2238 CNETERR("Error sending PUT to %s: %d\n",
2239 libcfs_id2str(target), rc);
2240 lnet_finalize(NULL, msg, rc);
2241 }
2242
2243 /* completion will be signalled by an event */
2244 return 0;
2245 }
2246 EXPORT_SYMBOL(LNetPut);
2247
2248 lnet_msg_t *
2249 lnet_create_reply_msg(lnet_ni_t *ni, lnet_msg_t *getmsg)
2250 {
2251 /*
2252 * The LND can DMA direct to the GET md (i.e. no REPLY msg). This
2253 * returns a msg for the LND to pass to lnet_finalize() when the sink
2254 * data has been received.
2255 *
2256 * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
2257 * lnet_finalize() is called on it, so the LND must call this first
2258 */
2259 struct lnet_msg *msg = lnet_msg_alloc();
2260 struct lnet_libmd *getmd = getmsg->msg_md;
2261 lnet_process_id_t peer_id = getmsg->msg_target;
2262 int cpt;
2263
2264 LASSERT(!getmsg->msg_target_is_router);
2265 LASSERT(!getmsg->msg_routing);
2266
2267 if (!msg) {
2268 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
2269 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2270 goto drop;
2271 }
2272
2273 cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
2274 lnet_res_lock(cpt);
2275
2276 LASSERT(getmd->md_refcount > 0);
2277
2278 if (!getmd->md_threshold) {
2279 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
2280 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
2281 getmd);
2282 lnet_res_unlock(cpt);
2283 goto drop;
2284 }
2285
2286 LASSERT(!getmd->md_offset);
2287
2288 CDEBUG(D_NET, "%s: Reply from %s md %p\n",
2289 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2290
2291 /* setup information for lnet_build_msg_event */
2292 msg->msg_from = peer_id.nid;
2293 msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
2294 msg->msg_hdr.src_nid = peer_id.nid;
2295 msg->msg_hdr.payload_length = getmd->md_length;
2296 msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
2297
2298 lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
2299 lnet_res_unlock(cpt);
2300
2301 cpt = lnet_cpt_of_nid(peer_id.nid);
2302
2303 lnet_net_lock(cpt);
2304 lnet_msg_commit(msg, cpt);
2305 lnet_net_unlock(cpt);
2306
2307 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2308
2309 return msg;
2310
2311 drop:
2312 cpt = lnet_cpt_of_nid(peer_id.nid);
2313
2314 lnet_net_lock(cpt);
2315 the_lnet.ln_counters[cpt]->drop_count++;
2316 the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
2317 lnet_net_unlock(cpt);
2318
2319 if (msg)
2320 lnet_msg_free(msg);
2321
2322 return NULL;
2323 }
2324 EXPORT_SYMBOL(lnet_create_reply_msg);
2325
2326 void
2327 lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2328 {
2329 /*
2330 * Set the REPLY length, now the RDMA that elides the REPLY message has
2331 * completed and I know it.
2332 */
2333 LASSERT(reply);
2334 LASSERT(reply->msg_type == LNET_MSG_GET);
2335 LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
2336
2337 /*
2338 * NB I trusted my peer to RDMA. If she tells me she's written beyond
2339 * the end of my buffer, I might as well be dead.
2340 */
2341 LASSERT(len <= reply->msg_ev.mlength);
2342
2343 reply->msg_ev.mlength = len;
2344 }
2345 EXPORT_SYMBOL(lnet_set_reply_msg_len);
2346
2347 /**
2348 * Initiate an asynchronous GET operation.
2349 *
2350 * On the initiator node, an LNET_EVENT_SEND is logged when the GET request
2351 * is sent, and an LNET_EVENT_REPLY is logged when the data returned from
2352 * the target node in the REPLY has been written to local MD.
2353 *
2354 * On the target node, an LNET_EVENT_GET is logged when the GET request
2355 * arrives and is accepted into a MD.
2356 *
2357 * \param self,target,portal,match_bits,offset See the discussion in LNetPut().
2358 * \param mdh A handle for the MD that describes the memory into which the
2359 * requested data will be received. The MD must be "free floating"
2360 * (See LNetMDBind()).
2361 *
2362 * \retval 0 Success, and only in this case events will be generated
2363 * and logged to EQ (if it exists) of the MD.
2364 * \retval -EIO Simulated failure.
2365 * \retval -ENOMEM Memory allocation failure.
2366 * \retval -ENOENT Invalid MD object.
2367 */
2368 int
2369 LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
2370 lnet_process_id_t target, unsigned int portal,
2371 __u64 match_bits, unsigned int offset)
2372 {
2373 struct lnet_msg *msg;
2374 struct lnet_libmd *md;
2375 int cpt;
2376 int rc;
2377
2378 LASSERT(the_lnet.ln_refcount > 0);
2379
2380 if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
2381 fail_peer(target.nid, 1)) { /* shall we now? */
2382 CERROR("Dropping GET to %s: simulated failure\n",
2383 libcfs_id2str(target));
2384 return -EIO;
2385 }
2386
2387 msg = lnet_msg_alloc();
2388 if (!msg) {
2389 CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2390 libcfs_id2str(target));
2391 return -ENOMEM;
2392 }
2393
2394 cpt = lnet_cpt_of_cookie(mdh.cookie);
2395 lnet_res_lock(cpt);
2396
2397 md = lnet_handle2md(&mdh);
2398 if (!md || !md->md_threshold || md->md_me) {
2399 CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
2400 match_bits, portal, libcfs_id2str(target),
2401 !md ? -1 : md->md_threshold);
2402 if (md && md->md_me)
2403 CERROR("REPLY MD also attached to portal %d\n",
2404 md->md_me->me_portal);
2405
2406 lnet_res_unlock(cpt);
2407
2408 lnet_msg_free(msg);
2409 return -ENOENT;
2410 }
2411
2412 CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2413
2414 lnet_msg_attach_md(msg, md, 0, 0);
2415
2416 lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2417
2418 msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2419 msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2420 msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2421 msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2422
2423 /* NB handles only looked up by creator (no flips) */
2424 msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
2425 the_lnet.ln_interface_cookie;
2426 msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
2427 md->md_lh.lh_cookie;
2428
2429 lnet_res_unlock(cpt);
2430
2431 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2432
2433 rc = lnet_send(self, msg, LNET_NID_ANY);
2434 if (rc < 0) {
2435 CNETERR("Error sending GET to %s: %d\n",
2436 libcfs_id2str(target), rc);
2437 lnet_finalize(NULL, msg, rc);
2438 }
2439
2440 /* completion will be signalled by an event */
2441 return 0;
2442 }
2443 EXPORT_SYMBOL(LNetGet);
2444
2445 /**
2446 * Calculate distance to node at \a dstnid.
2447 *
2448 * \param dstnid Target NID.
2449 * \param srcnidp If not NULL, NID of the local interface to reach \a dstnid
2450 * is saved here.
2451 * \param orderp If not NULL, order of the route to reach \a dstnid is saved
2452 * here.
2453 *
2454 * \retval 0 If \a dstnid belongs to a local interface, and reserved option
2455 * local_nid_dist_zero is set, which is the default.
2456 * \retval positives Distance to target NID, i.e. number of hops plus one.
2457 * \retval -EHOSTUNREACH If \a dstnid is not reachable.
2458 */
2459 int
2460 LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2461 {
2462 struct list_head *e;
2463 struct lnet_ni *ni;
2464 lnet_remotenet_t *rnet;
2465 __u32 dstnet = LNET_NIDNET(dstnid);
2466 int hops;
2467 int cpt;
2468 __u32 order = 2;
2469 struct list_head *rn_list;
2470
2471 /*
2472 * if !local_nid_dist_zero, I don't return a distance of 0 ever
2473 * (when lustre sees a distance of 0, it substitutes 0@lo), so I
2474 * keep order 0 free for 0@lo and order 1 free for a local NID
2475 * match
2476 */
2477 LASSERT(the_lnet.ln_refcount > 0);
2478
2479 cpt = lnet_net_lock_current();
2480
2481 list_for_each(e, &the_lnet.ln_nis) {
2482 ni = list_entry(e, lnet_ni_t, ni_list);
2483
2484 if (ni->ni_nid == dstnid) {
2485 if (srcnidp)
2486 *srcnidp = dstnid;
2487 if (orderp) {
2488 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2489 *orderp = 0;
2490 else
2491 *orderp = 1;
2492 }
2493 lnet_net_unlock(cpt);
2494
2495 return local_nid_dist_zero ? 0 : 1;
2496 }
2497
2498 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2499 if (srcnidp)
2500 *srcnidp = ni->ni_nid;
2501 if (orderp)
2502 *orderp = order;
2503 lnet_net_unlock(cpt);
2504 return 1;
2505 }
2506
2507 order++;
2508 }
2509
2510 rn_list = lnet_net2rnethash(dstnet);
2511 list_for_each(e, rn_list) {
2512 rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2513
2514 if (rnet->lrn_net == dstnet) {
2515 lnet_route_t *route;
2516 lnet_route_t *shortest = NULL;
2517 __u32 shortest_hops = LNET_UNDEFINED_HOPS;
2518 __u32 route_hops;
2519
2520 LASSERT(!list_empty(&rnet->lrn_routes));
2521
2522 list_for_each_entry(route, &rnet->lrn_routes,
2523 lr_list) {
2524 route_hops = route->lr_hops;
2525 if (route_hops == LNET_UNDEFINED_HOPS)
2526 route_hops = 1;
2527 if (!shortest ||
2528 route_hops < shortest_hops) {
2529 shortest = route;
2530 shortest_hops = route_hops;
2531 }
2532 }
2533
2534 LASSERT(shortest);
2535 hops = shortest_hops;
2536 if (srcnidp)
2537 *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
2538 if (orderp)
2539 *orderp = order;
2540 lnet_net_unlock(cpt);
2541 return hops + 1;
2542 }
2543 order++;
2544 }
2545
2546 lnet_net_unlock(cpt);
2547 return -EHOSTUNREACH;
2548 }
2549 EXPORT_SYMBOL(LNetDist);
This page took 0.271883 seconds and 5 git commands to generate.