2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * Encapsulates the major functions managing:
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
56 #include "xprt_rdma.h"
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY RPCDBG_TRANS
71 * handle replies in tasklet context, using a single, global list
72 * rdma tasklet function -- just turn around and call the func
73 * for all replies on the list
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g
);
77 static LIST_HEAD(rpcrdma_tasklets_g
);
80 rpcrdma_run_tasklet(unsigned long data
)
82 struct rpcrdma_rep
*rep
;
86 spin_lock_irqsave(&rpcrdma_tk_lock_g
, flags
);
87 while (!list_empty(&rpcrdma_tasklets_g
)) {
88 rep
= list_entry(rpcrdma_tasklets_g
.next
,
89 struct rpcrdma_rep
, rr_list
);
90 list_del(&rep
->rr_list
);
91 spin_unlock_irqrestore(&rpcrdma_tk_lock_g
, flags
);
93 rpcrdma_reply_handler(rep
);
95 spin_lock_irqsave(&rpcrdma_tk_lock_g
, flags
);
97 spin_unlock_irqrestore(&rpcrdma_tk_lock_g
, flags
);
100 static DECLARE_TASKLET(rpcrdma_tasklet_g
, rpcrdma_run_tasklet
, 0UL);
102 static const char * const async_event
[] = {
107 "communication established",
108 "send queue drained",
109 "path migration successful",
111 "device fatal error",
124 #define ASYNC_MSG(status) \
125 ((status) < ARRAY_SIZE(async_event) ? \
126 async_event[(status)] : "unknown async error")
129 rpcrdma_schedule_tasklet(struct list_head
*sched_list
)
133 spin_lock_irqsave(&rpcrdma_tk_lock_g
, flags
);
134 list_splice_tail(sched_list
, &rpcrdma_tasklets_g
);
135 spin_unlock_irqrestore(&rpcrdma_tk_lock_g
, flags
);
136 tasklet_schedule(&rpcrdma_tasklet_g
);
140 rpcrdma_qp_async_error_upcall(struct ib_event
*event
, void *context
)
142 struct rpcrdma_ep
*ep
= context
;
144 pr_err("RPC: %s: %s on device %s ep %p\n",
145 __func__
, ASYNC_MSG(event
->event
),
146 event
->device
->name
, context
);
147 if (ep
->rep_connected
== 1) {
148 ep
->rep_connected
= -EIO
;
149 rpcrdma_conn_func(ep
);
150 wake_up_all(&ep
->rep_connect_wait
);
155 rpcrdma_cq_async_error_upcall(struct ib_event
*event
, void *context
)
157 struct rpcrdma_ep
*ep
= context
;
159 pr_err("RPC: %s: %s on device %s ep %p\n",
160 __func__
, ASYNC_MSG(event
->event
),
161 event
->device
->name
, context
);
162 if (ep
->rep_connected
== 1) {
163 ep
->rep_connected
= -EIO
;
164 rpcrdma_conn_func(ep
);
165 wake_up_all(&ep
->rep_connect_wait
);
169 static const char * const wc_status
[] = {
171 "local length error",
172 "local QP operation error",
173 "local EE context operation error",
174 "local protection error",
176 "memory management operation error",
177 "bad response error",
178 "local access error",
179 "remote invalid request error",
180 "remote access error",
181 "remote operation error",
182 "transport retry counter exceeded",
183 "RNR retry counter exceeded",
184 "local RDD violation error",
185 "remove invalid RD request",
187 "invalid EE context number",
188 "invalid EE context state",
190 "response timeout error",
194 #define COMPLETION_MSG(status) \
195 ((status) < ARRAY_SIZE(wc_status) ? \
196 wc_status[(status)] : "unexpected completion error")
199 rpcrdma_sendcq_process_wc(struct ib_wc
*wc
)
201 /* WARNING: Only wr_id and status are reliable at this point */
202 if (wc
->wr_id
== RPCRDMA_IGNORE_COMPLETION
) {
203 if (wc
->status
!= IB_WC_SUCCESS
&&
204 wc
->status
!= IB_WC_WR_FLUSH_ERR
)
205 pr_err("RPC: %s: SEND: %s\n",
206 __func__
, COMPLETION_MSG(wc
->status
));
208 struct rpcrdma_mw
*r
;
210 r
= (struct rpcrdma_mw
*)(unsigned long)wc
->wr_id
;
211 r
->mw_sendcompletion(wc
);
216 rpcrdma_sendcq_poll(struct ib_cq
*cq
, struct rpcrdma_ep
*ep
)
219 int budget
, count
, rc
;
221 budget
= RPCRDMA_WC_BUDGET
/ RPCRDMA_POLLSIZE
;
223 wcs
= ep
->rep_send_wcs
;
225 rc
= ib_poll_cq(cq
, RPCRDMA_POLLSIZE
, wcs
);
231 rpcrdma_sendcq_process_wc(wcs
++);
232 } while (rc
== RPCRDMA_POLLSIZE
&& --budget
);
237 * Handle send, fast_reg_mr, and local_inv completions.
239 * Send events are typically suppressed and thus do not result
240 * in an upcall. Occasionally one is signaled, however. This
241 * prevents the provider's completion queue from wrapping and
242 * losing a completion.
245 rpcrdma_sendcq_upcall(struct ib_cq
*cq
, void *cq_context
)
247 struct rpcrdma_ep
*ep
= (struct rpcrdma_ep
*)cq_context
;
250 rc
= rpcrdma_sendcq_poll(cq
, ep
);
252 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
257 rc
= ib_req_notify_cq(cq
,
258 IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS
);
262 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
267 rpcrdma_sendcq_poll(cq
, ep
);
271 rpcrdma_recvcq_process_wc(struct ib_wc
*wc
, struct list_head
*sched_list
)
273 struct rpcrdma_rep
*rep
=
274 (struct rpcrdma_rep
*)(unsigned long)wc
->wr_id
;
276 /* WARNING: Only wr_id and status are reliable at this point */
277 if (wc
->status
!= IB_WC_SUCCESS
)
280 /* status == SUCCESS means all fields in wc are trustworthy */
281 if (wc
->opcode
!= IB_WC_RECV
)
284 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
285 __func__
, rep
, wc
->byte_len
);
287 rep
->rr_len
= wc
->byte_len
;
288 ib_dma_sync_single_for_cpu(rep
->rr_device
,
289 rdmab_addr(rep
->rr_rdmabuf
),
290 rep
->rr_len
, DMA_FROM_DEVICE
);
291 prefetch(rdmab_to_msg(rep
->rr_rdmabuf
));
294 list_add_tail(&rep
->rr_list
, sched_list
);
297 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
)
298 pr_err("RPC: %s: rep %p: %s\n",
299 __func__
, rep
, COMPLETION_MSG(wc
->status
));
305 rpcrdma_recvcq_poll(struct ib_cq
*cq
, struct rpcrdma_ep
*ep
)
307 struct list_head sched_list
;
309 int budget
, count
, rc
;
311 INIT_LIST_HEAD(&sched_list
);
312 budget
= RPCRDMA_WC_BUDGET
/ RPCRDMA_POLLSIZE
;
314 wcs
= ep
->rep_recv_wcs
;
316 rc
= ib_poll_cq(cq
, RPCRDMA_POLLSIZE
, wcs
);
322 rpcrdma_recvcq_process_wc(wcs
++, &sched_list
);
323 } while (rc
== RPCRDMA_POLLSIZE
&& --budget
);
327 rpcrdma_schedule_tasklet(&sched_list
);
332 * Handle receive completions.
334 * It is reentrant but processes single events in order to maintain
335 * ordering of receives to keep server credits.
337 * It is the responsibility of the scheduled tasklet to return
338 * recv buffers to the pool. NOTE: this affects synchronization of
339 * connection shutdown. That is, the structures required for
340 * the completion of the reply handler must remain intact until
341 * all memory has been reclaimed.
344 rpcrdma_recvcq_upcall(struct ib_cq
*cq
, void *cq_context
)
346 struct rpcrdma_ep
*ep
= (struct rpcrdma_ep
*)cq_context
;
349 rc
= rpcrdma_recvcq_poll(cq
, ep
);
351 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
356 rc
= ib_req_notify_cq(cq
,
357 IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS
);
361 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
366 rpcrdma_recvcq_poll(cq
, ep
);
370 rpcrdma_flush_cqs(struct rpcrdma_ep
*ep
)
373 LIST_HEAD(sched_list
);
375 while (ib_poll_cq(ep
->rep_attr
.recv_cq
, 1, &wc
) > 0)
376 rpcrdma_recvcq_process_wc(&wc
, &sched_list
);
377 if (!list_empty(&sched_list
))
378 rpcrdma_schedule_tasklet(&sched_list
);
379 while (ib_poll_cq(ep
->rep_attr
.send_cq
, 1, &wc
) > 0)
380 rpcrdma_sendcq_process_wc(&wc
);
383 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
384 static const char * const conn
[] = {
403 #define CONNECTION_MSG(status) \
404 ((status) < ARRAY_SIZE(conn) ? \
405 conn[(status)] : "unrecognized connection error")
409 rpcrdma_conn_upcall(struct rdma_cm_id
*id
, struct rdma_cm_event
*event
)
411 struct rpcrdma_xprt
*xprt
= id
->context
;
412 struct rpcrdma_ia
*ia
= &xprt
->rx_ia
;
413 struct rpcrdma_ep
*ep
= &xprt
->rx_ep
;
414 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
415 struct sockaddr
*sap
= (struct sockaddr
*)&ep
->rep_remote_addr
;
417 struct ib_qp_attr
*attr
= &ia
->ri_qp_attr
;
418 struct ib_qp_init_attr
*iattr
= &ia
->ri_qp_init_attr
;
421 switch (event
->event
) {
422 case RDMA_CM_EVENT_ADDR_RESOLVED
:
423 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
425 complete(&ia
->ri_done
);
427 case RDMA_CM_EVENT_ADDR_ERROR
:
428 ia
->ri_async_rc
= -EHOSTUNREACH
;
429 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
431 complete(&ia
->ri_done
);
433 case RDMA_CM_EVENT_ROUTE_ERROR
:
434 ia
->ri_async_rc
= -ENETUNREACH
;
435 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
437 complete(&ia
->ri_done
);
439 case RDMA_CM_EVENT_ESTABLISHED
:
441 ib_query_qp(ia
->ri_id
->qp
, attr
,
442 IB_QP_MAX_QP_RD_ATOMIC
| IB_QP_MAX_DEST_RD_ATOMIC
,
444 dprintk("RPC: %s: %d responder resources"
446 __func__
, attr
->max_dest_rd_atomic
,
447 attr
->max_rd_atomic
);
449 case RDMA_CM_EVENT_CONNECT_ERROR
:
450 connstate
= -ENOTCONN
;
452 case RDMA_CM_EVENT_UNREACHABLE
:
453 connstate
= -ENETDOWN
;
455 case RDMA_CM_EVENT_REJECTED
:
456 connstate
= -ECONNREFUSED
;
458 case RDMA_CM_EVENT_DISCONNECTED
:
459 connstate
= -ECONNABORTED
;
461 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
464 dprintk("RPC: %s: %sconnected\n",
465 __func__
, connstate
> 0 ? "" : "dis");
466 ep
->rep_connected
= connstate
;
467 rpcrdma_conn_func(ep
);
468 wake_up_all(&ep
->rep_connect_wait
);
471 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
472 __func__
, sap
, rpc_get_port(sap
), ep
,
473 CONNECTION_MSG(event
->event
));
477 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
478 if (connstate
== 1) {
479 int ird
= attr
->max_dest_rd_atomic
;
480 int tird
= ep
->rep_remote_cma
.responder_resources
;
482 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
483 sap
, rpc_get_port(sap
),
485 ia
->ri_ops
->ro_displayname
,
486 xprt
->rx_buf
.rb_max_requests
,
487 ird
, ird
< 4 && ird
< tird
/ 2 ? " (low!)" : "");
488 } else if (connstate
< 0) {
489 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
490 sap
, rpc_get_port(sap
), connstate
);
497 static struct rdma_cm_id
*
498 rpcrdma_create_id(struct rpcrdma_xprt
*xprt
,
499 struct rpcrdma_ia
*ia
, struct sockaddr
*addr
)
501 struct rdma_cm_id
*id
;
504 init_completion(&ia
->ri_done
);
506 id
= rdma_create_id(rpcrdma_conn_upcall
, xprt
, RDMA_PS_TCP
, IB_QPT_RC
);
509 dprintk("RPC: %s: rdma_create_id() failed %i\n",
514 ia
->ri_async_rc
= -ETIMEDOUT
;
515 rc
= rdma_resolve_addr(id
, NULL
, addr
, RDMA_RESOLVE_TIMEOUT
);
517 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
521 wait_for_completion_interruptible_timeout(&ia
->ri_done
,
522 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
) + 1);
523 rc
= ia
->ri_async_rc
;
527 ia
->ri_async_rc
= -ETIMEDOUT
;
528 rc
= rdma_resolve_route(id
, RDMA_RESOLVE_TIMEOUT
);
530 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
534 wait_for_completion_interruptible_timeout(&ia
->ri_done
,
535 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
) + 1);
536 rc
= ia
->ri_async_rc
;
548 * Drain any cq, prior to teardown.
551 rpcrdma_clean_cq(struct ib_cq
*cq
)
556 while (1 == ib_poll_cq(cq
, 1, &wc
))
560 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
561 __func__
, count
, wc
.opcode
);
565 * Exported functions.
569 * Open and initialize an Interface Adapter.
570 * o initializes fields of struct rpcrdma_ia, including
571 * interface and provider attributes and protection zone.
574 rpcrdma_ia_open(struct rpcrdma_xprt
*xprt
, struct sockaddr
*addr
, int memreg
)
577 struct rpcrdma_ia
*ia
= &xprt
->rx_ia
;
578 struct ib_device_attr
*devattr
= &ia
->ri_devattr
;
580 ia
->ri_id
= rpcrdma_create_id(xprt
, ia
, addr
);
581 if (IS_ERR(ia
->ri_id
)) {
582 rc
= PTR_ERR(ia
->ri_id
);
585 ia
->ri_device
= ia
->ri_id
->device
;
587 ia
->ri_pd
= ib_alloc_pd(ia
->ri_device
);
588 if (IS_ERR(ia
->ri_pd
)) {
589 rc
= PTR_ERR(ia
->ri_pd
);
590 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
595 rc
= ib_query_device(ia
->ri_device
, devattr
);
597 dprintk("RPC: %s: ib_query_device failed %d\n",
602 if (devattr
->device_cap_flags
& IB_DEVICE_LOCAL_DMA_LKEY
) {
603 ia
->ri_have_dma_lkey
= 1;
604 ia
->ri_dma_lkey
= ia
->ri_device
->local_dma_lkey
;
607 if (memreg
== RPCRDMA_FRMR
) {
608 /* Requires both frmr reg and local dma lkey */
609 if (((devattr
->device_cap_flags
&
610 (IB_DEVICE_MEM_MGT_EXTENSIONS
|IB_DEVICE_LOCAL_DMA_LKEY
)) !=
611 (IB_DEVICE_MEM_MGT_EXTENSIONS
|IB_DEVICE_LOCAL_DMA_LKEY
)) ||
612 (devattr
->max_fast_reg_page_list_len
== 0)) {
613 dprintk("RPC: %s: FRMR registration "
614 "not supported by HCA\n", __func__
);
615 memreg
= RPCRDMA_MTHCAFMR
;
618 if (memreg
== RPCRDMA_MTHCAFMR
) {
619 if (!ia
->ri_device
->alloc_fmr
) {
620 dprintk("RPC: %s: MTHCAFMR registration "
621 "not supported by HCA\n", __func__
);
622 memreg
= RPCRDMA_ALLPHYSICAL
;
627 * Optionally obtain an underlying physical identity mapping in
628 * order to do a memory window-based bind. This base registration
629 * is protected from remote access - that is enabled only by binding
630 * for the specific bytes targeted during each RPC operation, and
631 * revoked after the corresponding completion similar to a storage
636 ia
->ri_ops
= &rpcrdma_frwr_memreg_ops
;
638 case RPCRDMA_ALLPHYSICAL
:
639 ia
->ri_ops
= &rpcrdma_physical_memreg_ops
;
640 mem_priv
= IB_ACCESS_LOCAL_WRITE
|
641 IB_ACCESS_REMOTE_WRITE
|
642 IB_ACCESS_REMOTE_READ
;
644 case RPCRDMA_MTHCAFMR
:
645 ia
->ri_ops
= &rpcrdma_fmr_memreg_ops
;
646 if (ia
->ri_have_dma_lkey
)
648 mem_priv
= IB_ACCESS_LOCAL_WRITE
;
650 ia
->ri_bind_mem
= ib_get_dma_mr(ia
->ri_pd
, mem_priv
);
651 if (IS_ERR(ia
->ri_bind_mem
)) {
652 printk(KERN_ALERT
"%s: ib_get_dma_mr for "
653 "phys register failed with %lX\n",
654 __func__
, PTR_ERR(ia
->ri_bind_mem
));
660 printk(KERN_ERR
"RPC: Unsupported memory "
661 "registration mode: %d\n", memreg
);
665 dprintk("RPC: %s: memory registration strategy is '%s'\n",
666 __func__
, ia
->ri_ops
->ro_displayname
);
668 /* Else will do memory reg/dereg for each chunk */
669 ia
->ri_memreg_strategy
= memreg
;
671 rwlock_init(&ia
->ri_qplock
);
675 ib_dealloc_pd(ia
->ri_pd
);
678 rdma_destroy_id(ia
->ri_id
);
685 * Clean up/close an IA.
686 * o if event handles and PD have been initialized, free them.
690 rpcrdma_ia_close(struct rpcrdma_ia
*ia
)
694 dprintk("RPC: %s: entering\n", __func__
);
695 if (ia
->ri_bind_mem
!= NULL
) {
696 rc
= ib_dereg_mr(ia
->ri_bind_mem
);
697 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
701 if (ia
->ri_id
!= NULL
&& !IS_ERR(ia
->ri_id
)) {
703 rdma_destroy_qp(ia
->ri_id
);
704 rdma_destroy_id(ia
->ri_id
);
708 /* If the pd is still busy, xprtrdma missed freeing a resource */
709 if (ia
->ri_pd
&& !IS_ERR(ia
->ri_pd
))
710 WARN_ON(ib_dealloc_pd(ia
->ri_pd
));
714 * Create unconnected endpoint.
717 rpcrdma_ep_create(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
,
718 struct rpcrdma_create_data_internal
*cdata
)
720 struct ib_device_attr
*devattr
= &ia
->ri_devattr
;
721 struct ib_cq
*sendcq
, *recvcq
;
724 /* check provider's send/recv wr limits */
725 if (cdata
->max_requests
> devattr
->max_qp_wr
)
726 cdata
->max_requests
= devattr
->max_qp_wr
;
728 ep
->rep_attr
.event_handler
= rpcrdma_qp_async_error_upcall
;
729 ep
->rep_attr
.qp_context
= ep
;
730 ep
->rep_attr
.srq
= NULL
;
731 ep
->rep_attr
.cap
.max_send_wr
= cdata
->max_requests
;
732 rc
= ia
->ri_ops
->ro_open(ia
, ep
, cdata
);
735 ep
->rep_attr
.cap
.max_recv_wr
= cdata
->max_requests
;
736 ep
->rep_attr
.cap
.max_send_sge
= (cdata
->padding
? 4 : 2);
737 ep
->rep_attr
.cap
.max_recv_sge
= 1;
738 ep
->rep_attr
.cap
.max_inline_data
= 0;
739 ep
->rep_attr
.sq_sig_type
= IB_SIGNAL_REQ_WR
;
740 ep
->rep_attr
.qp_type
= IB_QPT_RC
;
741 ep
->rep_attr
.port_num
= ~0;
743 if (cdata
->padding
) {
744 ep
->rep_padbuf
= rpcrdma_alloc_regbuf(ia
, cdata
->padding
,
746 if (IS_ERR(ep
->rep_padbuf
))
747 return PTR_ERR(ep
->rep_padbuf
);
749 ep
->rep_padbuf
= NULL
;
751 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
752 "iovs: send %d recv %d\n",
754 ep
->rep_attr
.cap
.max_send_wr
,
755 ep
->rep_attr
.cap
.max_recv_wr
,
756 ep
->rep_attr
.cap
.max_send_sge
,
757 ep
->rep_attr
.cap
.max_recv_sge
);
759 /* set trigger for requesting send completion */
760 ep
->rep_cqinit
= ep
->rep_attr
.cap
.max_send_wr
/2 - 1;
761 if (ep
->rep_cqinit
> RPCRDMA_MAX_UNSIGNALED_SENDS
)
762 ep
->rep_cqinit
= RPCRDMA_MAX_UNSIGNALED_SENDS
;
763 else if (ep
->rep_cqinit
<= 2)
766 init_waitqueue_head(&ep
->rep_connect_wait
);
767 INIT_DELAYED_WORK(&ep
->rep_connect_worker
, rpcrdma_connect_worker
);
769 sendcq
= ib_create_cq(ia
->ri_device
, rpcrdma_sendcq_upcall
,
770 rpcrdma_cq_async_error_upcall
, ep
,
771 ep
->rep_attr
.cap
.max_send_wr
+ 1, 0);
772 if (IS_ERR(sendcq
)) {
773 rc
= PTR_ERR(sendcq
);
774 dprintk("RPC: %s: failed to create send CQ: %i\n",
779 rc
= ib_req_notify_cq(sendcq
, IB_CQ_NEXT_COMP
);
781 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
786 recvcq
= ib_create_cq(ia
->ri_device
, rpcrdma_recvcq_upcall
,
787 rpcrdma_cq_async_error_upcall
, ep
,
788 ep
->rep_attr
.cap
.max_recv_wr
+ 1, 0);
789 if (IS_ERR(recvcq
)) {
790 rc
= PTR_ERR(recvcq
);
791 dprintk("RPC: %s: failed to create recv CQ: %i\n",
796 rc
= ib_req_notify_cq(recvcq
, IB_CQ_NEXT_COMP
);
798 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
800 ib_destroy_cq(recvcq
);
804 ep
->rep_attr
.send_cq
= sendcq
;
805 ep
->rep_attr
.recv_cq
= recvcq
;
807 /* Initialize cma parameters */
809 /* RPC/RDMA does not use private data */
810 ep
->rep_remote_cma
.private_data
= NULL
;
811 ep
->rep_remote_cma
.private_data_len
= 0;
813 /* Client offers RDMA Read but does not initiate */
814 ep
->rep_remote_cma
.initiator_depth
= 0;
815 if (devattr
->max_qp_rd_atom
> 32) /* arbitrary but <= 255 */
816 ep
->rep_remote_cma
.responder_resources
= 32;
818 ep
->rep_remote_cma
.responder_resources
=
819 devattr
->max_qp_rd_atom
;
821 ep
->rep_remote_cma
.retry_count
= 7;
822 ep
->rep_remote_cma
.flow_control
= 0;
823 ep
->rep_remote_cma
.rnr_retry_count
= 0;
828 err
= ib_destroy_cq(sendcq
);
830 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
833 rpcrdma_free_regbuf(ia
, ep
->rep_padbuf
);
840 * Disconnect and destroy endpoint. After this, the only
841 * valid operations on the ep are to free it (if dynamically
842 * allocated) or re-create it.
845 rpcrdma_ep_destroy(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
849 dprintk("RPC: %s: entering, connected is %d\n",
850 __func__
, ep
->rep_connected
);
852 cancel_delayed_work_sync(&ep
->rep_connect_worker
);
855 rpcrdma_ep_disconnect(ep
, ia
);
856 rdma_destroy_qp(ia
->ri_id
);
857 ia
->ri_id
->qp
= NULL
;
860 rpcrdma_free_regbuf(ia
, ep
->rep_padbuf
);
862 rpcrdma_clean_cq(ep
->rep_attr
.recv_cq
);
863 rc
= ib_destroy_cq(ep
->rep_attr
.recv_cq
);
865 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
868 rpcrdma_clean_cq(ep
->rep_attr
.send_cq
);
869 rc
= ib_destroy_cq(ep
->rep_attr
.send_cq
);
871 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
876 * Connect unconnected endpoint.
879 rpcrdma_ep_connect(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
881 struct rdma_cm_id
*id
, *old
;
885 if (ep
->rep_connected
!= 0) {
886 struct rpcrdma_xprt
*xprt
;
888 dprintk("RPC: %s: reconnecting...\n", __func__
);
890 rpcrdma_ep_disconnect(ep
, ia
);
891 rpcrdma_flush_cqs(ep
);
893 xprt
= container_of(ia
, struct rpcrdma_xprt
, rx_ia
);
894 id
= rpcrdma_create_id(xprt
, ia
,
895 (struct sockaddr
*)&xprt
->rx_data
.addr
);
900 /* TEMP TEMP TEMP - fail if new device:
901 * Deregister/remarshal *all* requests!
902 * Close and recreate adapter, pd, etc!
903 * Re-determine all attributes still sane!
904 * More stuff I haven't thought of!
907 if (ia
->ri_device
!= id
->device
) {
908 printk("RPC: %s: can't reconnect on "
909 "different device!\n", __func__
);
915 rc
= rdma_create_qp(id
, ia
->ri_pd
, &ep
->rep_attr
);
917 dprintk("RPC: %s: rdma_create_qp failed %i\n",
924 write_lock(&ia
->ri_qplock
);
927 write_unlock(&ia
->ri_qplock
);
929 rdma_destroy_qp(old
);
930 rdma_destroy_id(old
);
932 dprintk("RPC: %s: connecting...\n", __func__
);
933 rc
= rdma_create_qp(ia
->ri_id
, ia
->ri_pd
, &ep
->rep_attr
);
935 dprintk("RPC: %s: rdma_create_qp failed %i\n",
937 /* do not update ep->rep_connected */
942 ep
->rep_connected
= 0;
944 rc
= rdma_connect(ia
->ri_id
, &ep
->rep_remote_cma
);
946 dprintk("RPC: %s: rdma_connect() failed with %i\n",
951 wait_event_interruptible(ep
->rep_connect_wait
, ep
->rep_connected
!= 0);
954 * Check state. A non-peer reject indicates no listener
955 * (ECONNREFUSED), which may be a transient state. All
956 * others indicate a transport condition which has already
957 * undergone a best-effort.
959 if (ep
->rep_connected
== -ECONNREFUSED
&&
960 ++retry_count
<= RDMA_CONNECT_RETRY_MAX
) {
961 dprintk("RPC: %s: non-peer_reject, retry\n", __func__
);
964 if (ep
->rep_connected
<= 0) {
965 /* Sometimes, the only way to reliably connect to remote
966 * CMs is to use same nonzero values for ORD and IRD. */
967 if (retry_count
++ <= RDMA_CONNECT_RETRY_MAX
+ 1 &&
968 (ep
->rep_remote_cma
.responder_resources
== 0 ||
969 ep
->rep_remote_cma
.initiator_depth
!=
970 ep
->rep_remote_cma
.responder_resources
)) {
971 if (ep
->rep_remote_cma
.responder_resources
== 0)
972 ep
->rep_remote_cma
.responder_resources
= 1;
973 ep
->rep_remote_cma
.initiator_depth
=
974 ep
->rep_remote_cma
.responder_resources
;
977 rc
= ep
->rep_connected
;
979 dprintk("RPC: %s: connected\n", __func__
);
984 ep
->rep_connected
= rc
;
989 * rpcrdma_ep_disconnect
991 * This is separate from destroy to facilitate the ability
992 * to reconnect without recreating the endpoint.
994 * This call is not reentrant, and must not be made in parallel
995 * on the same endpoint.
998 rpcrdma_ep_disconnect(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
1002 rpcrdma_flush_cqs(ep
);
1003 rc
= rdma_disconnect(ia
->ri_id
);
1005 /* returns without wait if not connected */
1006 wait_event_interruptible(ep
->rep_connect_wait
,
1007 ep
->rep_connected
!= 1);
1008 dprintk("RPC: %s: after wait, %sconnected\n", __func__
,
1009 (ep
->rep_connected
== 1) ? "still " : "dis");
1011 dprintk("RPC: %s: rdma_disconnect %i\n", __func__
, rc
);
1012 ep
->rep_connected
= rc
;
1016 static struct rpcrdma_req
*
1017 rpcrdma_create_req(struct rpcrdma_xprt
*r_xprt
)
1019 struct rpcrdma_req
*req
;
1021 req
= kzalloc(sizeof(*req
), GFP_KERNEL
);
1023 return ERR_PTR(-ENOMEM
);
1025 req
->rl_buffer
= &r_xprt
->rx_buf
;
1029 static struct rpcrdma_rep
*
1030 rpcrdma_create_rep(struct rpcrdma_xprt
*r_xprt
)
1032 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
1033 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
1034 struct rpcrdma_rep
*rep
;
1038 rep
= kzalloc(sizeof(*rep
), GFP_KERNEL
);
1042 rep
->rr_rdmabuf
= rpcrdma_alloc_regbuf(ia
, cdata
->inline_rsize
,
1044 if (IS_ERR(rep
->rr_rdmabuf
)) {
1045 rc
= PTR_ERR(rep
->rr_rdmabuf
);
1049 rep
->rr_device
= ia
->ri_device
;
1050 rep
->rr_rxprt
= r_xprt
;
1060 rpcrdma_buffer_create(struct rpcrdma_xprt
*r_xprt
)
1062 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1063 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
1064 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
1069 buf
->rb_max_requests
= cdata
->max_requests
;
1070 spin_lock_init(&buf
->rb_lock
);
1072 /* Need to allocate:
1073 * 1. arrays for send and recv pointers
1074 * 2. arrays of struct rpcrdma_req to fill in pointers
1075 * 3. array of struct rpcrdma_rep for replies
1076 * Send/recv buffers in req/rep need to be registered
1078 len
= buf
->rb_max_requests
*
1079 (sizeof(struct rpcrdma_req
*) + sizeof(struct rpcrdma_rep
*));
1081 p
= kzalloc(len
, GFP_KERNEL
);
1083 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1088 buf
->rb_pool
= p
; /* for freeing it later */
1090 buf
->rb_send_bufs
= (struct rpcrdma_req
**) p
;
1091 p
= (char *) &buf
->rb_send_bufs
[buf
->rb_max_requests
];
1092 buf
->rb_recv_bufs
= (struct rpcrdma_rep
**) p
;
1093 p
= (char *) &buf
->rb_recv_bufs
[buf
->rb_max_requests
];
1095 rc
= ia
->ri_ops
->ro_init(r_xprt
);
1099 for (i
= 0; i
< buf
->rb_max_requests
; i
++) {
1100 struct rpcrdma_req
*req
;
1101 struct rpcrdma_rep
*rep
;
1103 req
= rpcrdma_create_req(r_xprt
);
1105 dprintk("RPC: %s: request buffer %d alloc"
1106 " failed\n", __func__
, i
);
1110 buf
->rb_send_bufs
[i
] = req
;
1112 rep
= rpcrdma_create_rep(r_xprt
);
1114 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1119 buf
->rb_recv_bufs
[i
] = rep
;
1124 rpcrdma_buffer_destroy(buf
);
1129 rpcrdma_destroy_rep(struct rpcrdma_ia
*ia
, struct rpcrdma_rep
*rep
)
1134 rpcrdma_free_regbuf(ia
, rep
->rr_rdmabuf
);
1139 rpcrdma_destroy_req(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
)
1144 rpcrdma_free_regbuf(ia
, req
->rl_sendbuf
);
1145 rpcrdma_free_regbuf(ia
, req
->rl_rdmabuf
);
1150 rpcrdma_buffer_destroy(struct rpcrdma_buffer
*buf
)
1152 struct rpcrdma_ia
*ia
= rdmab_to_ia(buf
);
1155 /* clean up in reverse order from create
1156 * 1. recv mr memory (mr free, then kfree)
1157 * 2. send mr memory (mr free, then kfree)
1160 dprintk("RPC: %s: entering\n", __func__
);
1162 for (i
= 0; i
< buf
->rb_max_requests
; i
++) {
1163 if (buf
->rb_recv_bufs
)
1164 rpcrdma_destroy_rep(ia
, buf
->rb_recv_bufs
[i
]);
1165 if (buf
->rb_send_bufs
)
1166 rpcrdma_destroy_req(ia
, buf
->rb_send_bufs
[i
]);
1169 ia
->ri_ops
->ro_destroy(buf
);
1171 kfree(buf
->rb_pool
);
1175 rpcrdma_get_mw(struct rpcrdma_xprt
*r_xprt
)
1177 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1178 struct rpcrdma_mw
*mw
= NULL
;
1179 unsigned long flags
;
1181 spin_lock_irqsave(&buf
->rb_lock
, flags
);
1182 if (!list_empty(&buf
->rb_mws
)) {
1183 mw
= list_first_entry(&buf
->rb_mws
,
1184 struct rpcrdma_mw
, mw_list
);
1185 list_del_init(&mw
->mw_list
);
1187 spin_unlock_irqrestore(&buf
->rb_lock
, flags
);
1190 pr_err("RPC: %s: no MWs available\n", __func__
);
1195 rpcrdma_put_mw(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_mw
*mw
)
1197 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1198 unsigned long flags
;
1200 spin_lock_irqsave(&buf
->rb_lock
, flags
);
1201 list_add_tail(&mw
->mw_list
, &buf
->rb_mws
);
1202 spin_unlock_irqrestore(&buf
->rb_lock
, flags
);
1206 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req
*req
, struct rpcrdma_buffer
*buf
)
1208 buf
->rb_send_bufs
[--buf
->rb_send_index
] = req
;
1210 if (req
->rl_reply
) {
1211 buf
->rb_recv_bufs
[--buf
->rb_recv_index
] = req
->rl_reply
;
1212 req
->rl_reply
= NULL
;
1217 * Get a set of request/reply buffers.
1219 * Reply buffer (if needed) is attached to send buffer upon return.
1221 * rb_send_index and rb_recv_index MUST always be pointing to the
1222 * *next* available buffer (non-NULL). They are incremented after
1223 * removing buffers, and decremented *before* returning them.
1225 struct rpcrdma_req
*
1226 rpcrdma_buffer_get(struct rpcrdma_buffer
*buffers
)
1228 struct rpcrdma_req
*req
;
1229 unsigned long flags
;
1231 spin_lock_irqsave(&buffers
->rb_lock
, flags
);
1233 if (buffers
->rb_send_index
== buffers
->rb_max_requests
) {
1234 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1235 dprintk("RPC: %s: out of request buffers\n", __func__
);
1236 return ((struct rpcrdma_req
*)NULL
);
1239 req
= buffers
->rb_send_bufs
[buffers
->rb_send_index
];
1240 if (buffers
->rb_send_index
< buffers
->rb_recv_index
) {
1241 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1243 buffers
->rb_recv_index
- buffers
->rb_send_index
);
1244 req
->rl_reply
= NULL
;
1246 req
->rl_reply
= buffers
->rb_recv_bufs
[buffers
->rb_recv_index
];
1247 buffers
->rb_recv_bufs
[buffers
->rb_recv_index
++] = NULL
;
1249 buffers
->rb_send_bufs
[buffers
->rb_send_index
++] = NULL
;
1251 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1256 * Put request/reply buffers back into pool.
1257 * Pre-decrement counter/array index.
1260 rpcrdma_buffer_put(struct rpcrdma_req
*req
)
1262 struct rpcrdma_buffer
*buffers
= req
->rl_buffer
;
1263 unsigned long flags
;
1265 spin_lock_irqsave(&buffers
->rb_lock
, flags
);
1266 rpcrdma_buffer_put_sendbuf(req
, buffers
);
1267 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1271 * Recover reply buffers from pool.
1272 * This happens when recovering from error conditions.
1273 * Post-increment counter/array index.
1276 rpcrdma_recv_buffer_get(struct rpcrdma_req
*req
)
1278 struct rpcrdma_buffer
*buffers
= req
->rl_buffer
;
1279 unsigned long flags
;
1281 spin_lock_irqsave(&buffers
->rb_lock
, flags
);
1282 if (buffers
->rb_recv_index
< buffers
->rb_max_requests
) {
1283 req
->rl_reply
= buffers
->rb_recv_bufs
[buffers
->rb_recv_index
];
1284 buffers
->rb_recv_bufs
[buffers
->rb_recv_index
++] = NULL
;
1286 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1290 * Put reply buffers back into pool when not attached to
1291 * request. This happens in error conditions.
1294 rpcrdma_recv_buffer_put(struct rpcrdma_rep
*rep
)
1296 struct rpcrdma_buffer
*buffers
= &rep
->rr_rxprt
->rx_buf
;
1297 unsigned long flags
;
1299 spin_lock_irqsave(&buffers
->rb_lock
, flags
);
1300 buffers
->rb_recv_bufs
[--buffers
->rb_recv_index
] = rep
;
1301 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1305 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1309 rpcrdma_mapping_error(struct rpcrdma_mr_seg
*seg
)
1311 dprintk("RPC: map_one: offset %p iova %llx len %zu\n",
1313 (unsigned long long)seg
->mr_dma
, seg
->mr_dmalen
);
1317 rpcrdma_register_internal(struct rpcrdma_ia
*ia
, void *va
, int len
,
1318 struct ib_mr
**mrp
, struct ib_sge
*iov
)
1320 struct ib_phys_buf ipb
;
1325 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1327 iov
->addr
= ib_dma_map_single(ia
->ri_device
,
1328 va
, len
, DMA_BIDIRECTIONAL
);
1329 if (ib_dma_mapping_error(ia
->ri_device
, iov
->addr
))
1334 if (ia
->ri_have_dma_lkey
) {
1336 iov
->lkey
= ia
->ri_dma_lkey
;
1338 } else if (ia
->ri_bind_mem
!= NULL
) {
1340 iov
->lkey
= ia
->ri_bind_mem
->lkey
;
1344 ipb
.addr
= iov
->addr
;
1345 ipb
.size
= iov
->length
;
1346 mr
= ib_reg_phys_mr(ia
->ri_pd
, &ipb
, 1,
1347 IB_ACCESS_LOCAL_WRITE
, &iov
->addr
);
1349 dprintk("RPC: %s: phys convert: 0x%llx "
1350 "registered 0x%llx length %d\n",
1351 __func__
, (unsigned long long)ipb
.addr
,
1352 (unsigned long long)iov
->addr
, len
);
1357 dprintk("RPC: %s: failed with %i\n", __func__
, rc
);
1360 iov
->lkey
= mr
->lkey
;
1368 rpcrdma_deregister_internal(struct rpcrdma_ia
*ia
,
1369 struct ib_mr
*mr
, struct ib_sge
*iov
)
1373 ib_dma_unmap_single(ia
->ri_device
,
1374 iov
->addr
, iov
->length
, DMA_BIDIRECTIONAL
);
1379 rc
= ib_dereg_mr(mr
);
1381 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__
, rc
);
1386 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1387 * @ia: controlling rpcrdma_ia
1388 * @size: size of buffer to be allocated, in bytes
1391 * Returns pointer to private header of an area of internally
1392 * registered memory, or an ERR_PTR. The registered buffer follows
1393 * the end of the private header.
1395 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1396 * receiving the payload of RDMA RECV operations. regbufs are not
1397 * used for RDMA READ/WRITE operations, thus are registered only for
1400 struct rpcrdma_regbuf
*
1401 rpcrdma_alloc_regbuf(struct rpcrdma_ia
*ia
, size_t size
, gfp_t flags
)
1403 struct rpcrdma_regbuf
*rb
;
1407 rb
= kmalloc(sizeof(*rb
) + size
, flags
);
1412 rb
->rg_owner
= NULL
;
1413 rc
= rpcrdma_register_internal(ia
, rb
->rg_base
, size
,
1414 &rb
->rg_mr
, &rb
->rg_iov
);
1427 * rpcrdma_free_regbuf - deregister and free registered buffer
1428 * @ia: controlling rpcrdma_ia
1429 * @rb: regbuf to be deregistered and freed
1432 rpcrdma_free_regbuf(struct rpcrdma_ia
*ia
, struct rpcrdma_regbuf
*rb
)
1435 rpcrdma_deregister_internal(ia
, rb
->rg_mr
, &rb
->rg_iov
);
1441 * Prepost any receive buffer, then post send.
1443 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1446 rpcrdma_ep_post(struct rpcrdma_ia
*ia
,
1447 struct rpcrdma_ep
*ep
,
1448 struct rpcrdma_req
*req
)
1450 struct ib_send_wr send_wr
, *send_wr_fail
;
1451 struct rpcrdma_rep
*rep
= req
->rl_reply
;
1455 rc
= rpcrdma_ep_post_recv(ia
, ep
, rep
);
1458 req
->rl_reply
= NULL
;
1461 send_wr
.next
= NULL
;
1462 send_wr
.wr_id
= RPCRDMA_IGNORE_COMPLETION
;
1463 send_wr
.sg_list
= req
->rl_send_iov
;
1464 send_wr
.num_sge
= req
->rl_niovs
;
1465 send_wr
.opcode
= IB_WR_SEND
;
1466 if (send_wr
.num_sge
== 4) /* no need to sync any pad (constant) */
1467 ib_dma_sync_single_for_device(ia
->ri_device
,
1468 req
->rl_send_iov
[3].addr
,
1469 req
->rl_send_iov
[3].length
,
1471 ib_dma_sync_single_for_device(ia
->ri_device
,
1472 req
->rl_send_iov
[1].addr
,
1473 req
->rl_send_iov
[1].length
,
1475 ib_dma_sync_single_for_device(ia
->ri_device
,
1476 req
->rl_send_iov
[0].addr
,
1477 req
->rl_send_iov
[0].length
,
1480 if (DECR_CQCOUNT(ep
) > 0)
1481 send_wr
.send_flags
= 0;
1482 else { /* Provider must take a send completion every now and then */
1484 send_wr
.send_flags
= IB_SEND_SIGNALED
;
1487 rc
= ib_post_send(ia
->ri_id
->qp
, &send_wr
, &send_wr_fail
);
1489 dprintk("RPC: %s: ib_post_send returned %i\n", __func__
,
1496 * (Re)post a receive buffer.
1499 rpcrdma_ep_post_recv(struct rpcrdma_ia
*ia
,
1500 struct rpcrdma_ep
*ep
,
1501 struct rpcrdma_rep
*rep
)
1503 struct ib_recv_wr recv_wr
, *recv_wr_fail
;
1506 recv_wr
.next
= NULL
;
1507 recv_wr
.wr_id
= (u64
) (unsigned long) rep
;
1508 recv_wr
.sg_list
= &rep
->rr_rdmabuf
->rg_iov
;
1509 recv_wr
.num_sge
= 1;
1511 ib_dma_sync_single_for_cpu(ia
->ri_device
,
1512 rdmab_addr(rep
->rr_rdmabuf
),
1513 rdmab_length(rep
->rr_rdmabuf
),
1516 rc
= ib_post_recv(ia
->ri_id
->qp
, &recv_wr
, &recv_wr_fail
);
1519 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__
,
1524 /* How many chunk list items fit within our inline buffers?
1527 rpcrdma_max_segments(struct rpcrdma_xprt
*r_xprt
)
1529 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
1530 int bytes
, segments
;
1532 bytes
= min_t(unsigned int, cdata
->inline_wsize
, cdata
->inline_rsize
);
1533 bytes
-= RPCRDMA_HDRLEN_MIN
;
1534 if (bytes
< sizeof(struct rpcrdma_segment
) * 2) {
1535 pr_warn("RPC: %s: inline threshold too small\n",
1540 segments
= 1 << (fls(bytes
/ sizeof(struct rpcrdma_segment
)) - 1);
1541 dprintk("RPC: %s: max chunk list size = %d segments\n",
1542 __func__
, segments
);