2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * Encapsulates the major functions managing:
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
56 #include "xprt_rdma.h"
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY RPCDBG_TRANS
71 * handle replies in tasklet context, using a single, global list
72 * rdma tasklet function -- just turn around and call the func
73 * for all replies on the list
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g
);
77 static LIST_HEAD(rpcrdma_tasklets_g
);
80 rpcrdma_run_tasklet(unsigned long data
)
82 struct rpcrdma_rep
*rep
;
86 spin_lock_irqsave(&rpcrdma_tk_lock_g
, flags
);
87 while (!list_empty(&rpcrdma_tasklets_g
)) {
88 rep
= list_entry(rpcrdma_tasklets_g
.next
,
89 struct rpcrdma_rep
, rr_list
);
90 list_del(&rep
->rr_list
);
91 spin_unlock_irqrestore(&rpcrdma_tk_lock_g
, flags
);
93 rpcrdma_reply_handler(rep
);
95 spin_lock_irqsave(&rpcrdma_tk_lock_g
, flags
);
97 spin_unlock_irqrestore(&rpcrdma_tk_lock_g
, flags
);
100 static DECLARE_TASKLET(rpcrdma_tasklet_g
, rpcrdma_run_tasklet
, 0UL);
102 static const char * const async_event
[] = {
107 "communication established",
108 "send queue drained",
109 "path migration successful",
111 "device fatal error",
124 #define ASYNC_MSG(status) \
125 ((status) < ARRAY_SIZE(async_event) ? \
126 async_event[(status)] : "unknown async error")
129 rpcrdma_schedule_tasklet(struct list_head
*sched_list
)
133 spin_lock_irqsave(&rpcrdma_tk_lock_g
, flags
);
134 list_splice_tail(sched_list
, &rpcrdma_tasklets_g
);
135 spin_unlock_irqrestore(&rpcrdma_tk_lock_g
, flags
);
136 tasklet_schedule(&rpcrdma_tasklet_g
);
140 rpcrdma_qp_async_error_upcall(struct ib_event
*event
, void *context
)
142 struct rpcrdma_ep
*ep
= context
;
144 pr_err("RPC: %s: %s on device %s ep %p\n",
145 __func__
, ASYNC_MSG(event
->event
),
146 event
->device
->name
, context
);
147 if (ep
->rep_connected
== 1) {
148 ep
->rep_connected
= -EIO
;
149 rpcrdma_conn_func(ep
);
150 wake_up_all(&ep
->rep_connect_wait
);
155 rpcrdma_cq_async_error_upcall(struct ib_event
*event
, void *context
)
157 struct rpcrdma_ep
*ep
= context
;
159 pr_err("RPC: %s: %s on device %s ep %p\n",
160 __func__
, ASYNC_MSG(event
->event
),
161 event
->device
->name
, context
);
162 if (ep
->rep_connected
== 1) {
163 ep
->rep_connected
= -EIO
;
164 rpcrdma_conn_func(ep
);
165 wake_up_all(&ep
->rep_connect_wait
);
169 static const char * const wc_status
[] = {
171 "local length error",
172 "local QP operation error",
173 "local EE context operation error",
174 "local protection error",
176 "memory management operation error",
177 "bad response error",
178 "local access error",
179 "remote invalid request error",
180 "remote access error",
181 "remote operation error",
182 "transport retry counter exceeded",
183 "RNR retry counter exceeded",
184 "local RDD violation error",
185 "remove invalid RD request",
187 "invalid EE context number",
188 "invalid EE context state",
190 "response timeout error",
194 #define COMPLETION_MSG(status) \
195 ((status) < ARRAY_SIZE(wc_status) ? \
196 wc_status[(status)] : "unexpected completion error")
199 rpcrdma_sendcq_process_wc(struct ib_wc
*wc
)
201 /* WARNING: Only wr_id and status are reliable at this point */
202 if (wc
->wr_id
== RPCRDMA_IGNORE_COMPLETION
) {
203 if (wc
->status
!= IB_WC_SUCCESS
&&
204 wc
->status
!= IB_WC_WR_FLUSH_ERR
)
205 pr_err("RPC: %s: SEND: %s\n",
206 __func__
, COMPLETION_MSG(wc
->status
));
208 struct rpcrdma_mw
*r
;
210 r
= (struct rpcrdma_mw
*)(unsigned long)wc
->wr_id
;
211 r
->mw_sendcompletion(wc
);
216 rpcrdma_sendcq_poll(struct ib_cq
*cq
, struct rpcrdma_ep
*ep
)
219 int budget
, count
, rc
;
221 budget
= RPCRDMA_WC_BUDGET
/ RPCRDMA_POLLSIZE
;
223 wcs
= ep
->rep_send_wcs
;
225 rc
= ib_poll_cq(cq
, RPCRDMA_POLLSIZE
, wcs
);
231 rpcrdma_sendcq_process_wc(wcs
++);
232 } while (rc
== RPCRDMA_POLLSIZE
&& --budget
);
237 * Handle send, fast_reg_mr, and local_inv completions.
239 * Send events are typically suppressed and thus do not result
240 * in an upcall. Occasionally one is signaled, however. This
241 * prevents the provider's completion queue from wrapping and
242 * losing a completion.
245 rpcrdma_sendcq_upcall(struct ib_cq
*cq
, void *cq_context
)
247 struct rpcrdma_ep
*ep
= (struct rpcrdma_ep
*)cq_context
;
250 rc
= rpcrdma_sendcq_poll(cq
, ep
);
252 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
257 rc
= ib_req_notify_cq(cq
,
258 IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS
);
262 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
267 rpcrdma_sendcq_poll(cq
, ep
);
271 rpcrdma_recvcq_process_wc(struct ib_wc
*wc
, struct list_head
*sched_list
)
273 struct rpcrdma_rep
*rep
=
274 (struct rpcrdma_rep
*)(unsigned long)wc
->wr_id
;
276 /* WARNING: Only wr_id and status are reliable at this point */
277 if (wc
->status
!= IB_WC_SUCCESS
)
280 /* status == SUCCESS means all fields in wc are trustworthy */
281 if (wc
->opcode
!= IB_WC_RECV
)
284 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
285 __func__
, rep
, wc
->byte_len
);
287 rep
->rr_len
= wc
->byte_len
;
288 ib_dma_sync_single_for_cpu(rep
->rr_device
,
289 rdmab_addr(rep
->rr_rdmabuf
),
290 rep
->rr_len
, DMA_FROM_DEVICE
);
291 prefetch(rdmab_to_msg(rep
->rr_rdmabuf
));
294 list_add_tail(&rep
->rr_list
, sched_list
);
297 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
)
298 pr_err("RPC: %s: rep %p: %s\n",
299 __func__
, rep
, COMPLETION_MSG(wc
->status
));
305 rpcrdma_recvcq_poll(struct ib_cq
*cq
, struct rpcrdma_ep
*ep
)
307 struct list_head sched_list
;
309 int budget
, count
, rc
;
311 INIT_LIST_HEAD(&sched_list
);
312 budget
= RPCRDMA_WC_BUDGET
/ RPCRDMA_POLLSIZE
;
314 wcs
= ep
->rep_recv_wcs
;
316 rc
= ib_poll_cq(cq
, RPCRDMA_POLLSIZE
, wcs
);
322 rpcrdma_recvcq_process_wc(wcs
++, &sched_list
);
323 } while (rc
== RPCRDMA_POLLSIZE
&& --budget
);
327 rpcrdma_schedule_tasklet(&sched_list
);
332 * Handle receive completions.
334 * It is reentrant but processes single events in order to maintain
335 * ordering of receives to keep server credits.
337 * It is the responsibility of the scheduled tasklet to return
338 * recv buffers to the pool. NOTE: this affects synchronization of
339 * connection shutdown. That is, the structures required for
340 * the completion of the reply handler must remain intact until
341 * all memory has been reclaimed.
344 rpcrdma_recvcq_upcall(struct ib_cq
*cq
, void *cq_context
)
346 struct rpcrdma_ep
*ep
= (struct rpcrdma_ep
*)cq_context
;
349 rc
= rpcrdma_recvcq_poll(cq
, ep
);
351 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
356 rc
= ib_req_notify_cq(cq
,
357 IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS
);
361 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
366 rpcrdma_recvcq_poll(cq
, ep
);
370 rpcrdma_flush_cqs(struct rpcrdma_ep
*ep
)
373 LIST_HEAD(sched_list
);
375 while (ib_poll_cq(ep
->rep_attr
.recv_cq
, 1, &wc
) > 0)
376 rpcrdma_recvcq_process_wc(&wc
, &sched_list
);
377 if (!list_empty(&sched_list
))
378 rpcrdma_schedule_tasklet(&sched_list
);
379 while (ib_poll_cq(ep
->rep_attr
.send_cq
, 1, &wc
) > 0)
380 rpcrdma_sendcq_process_wc(&wc
);
383 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
384 static const char * const conn
[] = {
403 #define CONNECTION_MSG(status) \
404 ((status) < ARRAY_SIZE(conn) ? \
405 conn[(status)] : "unrecognized connection error")
409 rpcrdma_conn_upcall(struct rdma_cm_id
*id
, struct rdma_cm_event
*event
)
411 struct rpcrdma_xprt
*xprt
= id
->context
;
412 struct rpcrdma_ia
*ia
= &xprt
->rx_ia
;
413 struct rpcrdma_ep
*ep
= &xprt
->rx_ep
;
414 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
415 struct sockaddr
*sap
= (struct sockaddr
*)&ep
->rep_remote_addr
;
417 struct ib_qp_attr
*attr
= &ia
->ri_qp_attr
;
418 struct ib_qp_init_attr
*iattr
= &ia
->ri_qp_init_attr
;
421 switch (event
->event
) {
422 case RDMA_CM_EVENT_ADDR_RESOLVED
:
423 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
425 complete(&ia
->ri_done
);
427 case RDMA_CM_EVENT_ADDR_ERROR
:
428 ia
->ri_async_rc
= -EHOSTUNREACH
;
429 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
431 complete(&ia
->ri_done
);
433 case RDMA_CM_EVENT_ROUTE_ERROR
:
434 ia
->ri_async_rc
= -ENETUNREACH
;
435 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
437 complete(&ia
->ri_done
);
439 case RDMA_CM_EVENT_ESTABLISHED
:
441 ib_query_qp(ia
->ri_id
->qp
, attr
,
442 IB_QP_MAX_QP_RD_ATOMIC
| IB_QP_MAX_DEST_RD_ATOMIC
,
444 dprintk("RPC: %s: %d responder resources"
446 __func__
, attr
->max_dest_rd_atomic
,
447 attr
->max_rd_atomic
);
449 case RDMA_CM_EVENT_CONNECT_ERROR
:
450 connstate
= -ENOTCONN
;
452 case RDMA_CM_EVENT_UNREACHABLE
:
453 connstate
= -ENETDOWN
;
455 case RDMA_CM_EVENT_REJECTED
:
456 connstate
= -ECONNREFUSED
;
458 case RDMA_CM_EVENT_DISCONNECTED
:
459 connstate
= -ECONNABORTED
;
461 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
464 dprintk("RPC: %s: %sconnected\n",
465 __func__
, connstate
> 0 ? "" : "dis");
466 ep
->rep_connected
= connstate
;
467 rpcrdma_conn_func(ep
);
468 wake_up_all(&ep
->rep_connect_wait
);
471 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
472 __func__
, sap
, rpc_get_port(sap
), ep
,
473 CONNECTION_MSG(event
->event
));
477 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
478 if (connstate
== 1) {
479 int ird
= attr
->max_dest_rd_atomic
;
480 int tird
= ep
->rep_remote_cma
.responder_resources
;
482 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
483 sap
, rpc_get_port(sap
),
485 ia
->ri_ops
->ro_displayname
,
486 xprt
->rx_buf
.rb_max_requests
,
487 ird
, ird
< 4 && ird
< tird
/ 2 ? " (low!)" : "");
488 } else if (connstate
< 0) {
489 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
490 sap
, rpc_get_port(sap
), connstate
);
497 static struct rdma_cm_id
*
498 rpcrdma_create_id(struct rpcrdma_xprt
*xprt
,
499 struct rpcrdma_ia
*ia
, struct sockaddr
*addr
)
501 struct rdma_cm_id
*id
;
504 init_completion(&ia
->ri_done
);
506 id
= rdma_create_id(rpcrdma_conn_upcall
, xprt
, RDMA_PS_TCP
, IB_QPT_RC
);
509 dprintk("RPC: %s: rdma_create_id() failed %i\n",
514 ia
->ri_async_rc
= -ETIMEDOUT
;
515 rc
= rdma_resolve_addr(id
, NULL
, addr
, RDMA_RESOLVE_TIMEOUT
);
517 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
521 wait_for_completion_interruptible_timeout(&ia
->ri_done
,
522 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
) + 1);
523 rc
= ia
->ri_async_rc
;
527 ia
->ri_async_rc
= -ETIMEDOUT
;
528 rc
= rdma_resolve_route(id
, RDMA_RESOLVE_TIMEOUT
);
530 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
534 wait_for_completion_interruptible_timeout(&ia
->ri_done
,
535 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
) + 1);
536 rc
= ia
->ri_async_rc
;
548 * Drain any cq, prior to teardown.
551 rpcrdma_clean_cq(struct ib_cq
*cq
)
556 while (1 == ib_poll_cq(cq
, 1, &wc
))
560 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
561 __func__
, count
, wc
.opcode
);
565 * Exported functions.
569 * Open and initialize an Interface Adapter.
570 * o initializes fields of struct rpcrdma_ia, including
571 * interface and provider attributes and protection zone.
574 rpcrdma_ia_open(struct rpcrdma_xprt
*xprt
, struct sockaddr
*addr
, int memreg
)
577 struct rpcrdma_ia
*ia
= &xprt
->rx_ia
;
578 struct ib_device_attr
*devattr
= &ia
->ri_devattr
;
580 ia
->ri_id
= rpcrdma_create_id(xprt
, ia
, addr
);
581 if (IS_ERR(ia
->ri_id
)) {
582 rc
= PTR_ERR(ia
->ri_id
);
585 ia
->ri_device
= ia
->ri_id
->device
;
587 ia
->ri_pd
= ib_alloc_pd(ia
->ri_device
);
588 if (IS_ERR(ia
->ri_pd
)) {
589 rc
= PTR_ERR(ia
->ri_pd
);
590 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
595 rc
= ib_query_device(ia
->ri_device
, devattr
);
597 dprintk("RPC: %s: ib_query_device failed %d\n",
602 if (devattr
->device_cap_flags
& IB_DEVICE_LOCAL_DMA_LKEY
) {
603 ia
->ri_have_dma_lkey
= 1;
604 ia
->ri_dma_lkey
= ia
->ri_device
->local_dma_lkey
;
607 if (memreg
== RPCRDMA_FRMR
) {
608 /* Requires both frmr reg and local dma lkey */
609 if (((devattr
->device_cap_flags
&
610 (IB_DEVICE_MEM_MGT_EXTENSIONS
|IB_DEVICE_LOCAL_DMA_LKEY
)) !=
611 (IB_DEVICE_MEM_MGT_EXTENSIONS
|IB_DEVICE_LOCAL_DMA_LKEY
)) ||
612 (devattr
->max_fast_reg_page_list_len
== 0)) {
613 dprintk("RPC: %s: FRMR registration "
614 "not supported by HCA\n", __func__
);
615 memreg
= RPCRDMA_MTHCAFMR
;
618 if (memreg
== RPCRDMA_MTHCAFMR
) {
619 if (!ia
->ri_device
->alloc_fmr
) {
620 dprintk("RPC: %s: MTHCAFMR registration "
621 "not supported by HCA\n", __func__
);
622 memreg
= RPCRDMA_ALLPHYSICAL
;
627 * Optionally obtain an underlying physical identity mapping in
628 * order to do a memory window-based bind. This base registration
629 * is protected from remote access - that is enabled only by binding
630 * for the specific bytes targeted during each RPC operation, and
631 * revoked after the corresponding completion similar to a storage
636 ia
->ri_ops
= &rpcrdma_frwr_memreg_ops
;
638 case RPCRDMA_ALLPHYSICAL
:
639 ia
->ri_ops
= &rpcrdma_physical_memreg_ops
;
640 mem_priv
= IB_ACCESS_LOCAL_WRITE
|
641 IB_ACCESS_REMOTE_WRITE
|
642 IB_ACCESS_REMOTE_READ
;
644 case RPCRDMA_MTHCAFMR
:
645 ia
->ri_ops
= &rpcrdma_fmr_memreg_ops
;
646 if (ia
->ri_have_dma_lkey
)
648 mem_priv
= IB_ACCESS_LOCAL_WRITE
;
650 ia
->ri_bind_mem
= ib_get_dma_mr(ia
->ri_pd
, mem_priv
);
651 if (IS_ERR(ia
->ri_bind_mem
)) {
652 printk(KERN_ALERT
"%s: ib_get_dma_mr for "
653 "phys register failed with %lX\n",
654 __func__
, PTR_ERR(ia
->ri_bind_mem
));
660 printk(KERN_ERR
"RPC: Unsupported memory "
661 "registration mode: %d\n", memreg
);
665 dprintk("RPC: %s: memory registration strategy is '%s'\n",
666 __func__
, ia
->ri_ops
->ro_displayname
);
668 rwlock_init(&ia
->ri_qplock
);
672 ib_dealloc_pd(ia
->ri_pd
);
675 rdma_destroy_id(ia
->ri_id
);
682 * Clean up/close an IA.
683 * o if event handles and PD have been initialized, free them.
687 rpcrdma_ia_close(struct rpcrdma_ia
*ia
)
691 dprintk("RPC: %s: entering\n", __func__
);
692 if (ia
->ri_bind_mem
!= NULL
) {
693 rc
= ib_dereg_mr(ia
->ri_bind_mem
);
694 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
698 if (ia
->ri_id
!= NULL
&& !IS_ERR(ia
->ri_id
)) {
700 rdma_destroy_qp(ia
->ri_id
);
701 rdma_destroy_id(ia
->ri_id
);
705 /* If the pd is still busy, xprtrdma missed freeing a resource */
706 if (ia
->ri_pd
&& !IS_ERR(ia
->ri_pd
))
707 WARN_ON(ib_dealloc_pd(ia
->ri_pd
));
711 * Create unconnected endpoint.
714 rpcrdma_ep_create(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
,
715 struct rpcrdma_create_data_internal
*cdata
)
717 struct ib_device_attr
*devattr
= &ia
->ri_devattr
;
718 struct ib_cq
*sendcq
, *recvcq
;
721 /* check provider's send/recv wr limits */
722 if (cdata
->max_requests
> devattr
->max_qp_wr
)
723 cdata
->max_requests
= devattr
->max_qp_wr
;
725 ep
->rep_attr
.event_handler
= rpcrdma_qp_async_error_upcall
;
726 ep
->rep_attr
.qp_context
= ep
;
727 ep
->rep_attr
.srq
= NULL
;
728 ep
->rep_attr
.cap
.max_send_wr
= cdata
->max_requests
;
729 rc
= ia
->ri_ops
->ro_open(ia
, ep
, cdata
);
732 ep
->rep_attr
.cap
.max_recv_wr
= cdata
->max_requests
;
733 ep
->rep_attr
.cap
.max_send_sge
= (cdata
->padding
? 4 : 2);
734 ep
->rep_attr
.cap
.max_recv_sge
= 1;
735 ep
->rep_attr
.cap
.max_inline_data
= 0;
736 ep
->rep_attr
.sq_sig_type
= IB_SIGNAL_REQ_WR
;
737 ep
->rep_attr
.qp_type
= IB_QPT_RC
;
738 ep
->rep_attr
.port_num
= ~0;
740 if (cdata
->padding
) {
741 ep
->rep_padbuf
= rpcrdma_alloc_regbuf(ia
, cdata
->padding
,
743 if (IS_ERR(ep
->rep_padbuf
))
744 return PTR_ERR(ep
->rep_padbuf
);
746 ep
->rep_padbuf
= NULL
;
748 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
749 "iovs: send %d recv %d\n",
751 ep
->rep_attr
.cap
.max_send_wr
,
752 ep
->rep_attr
.cap
.max_recv_wr
,
753 ep
->rep_attr
.cap
.max_send_sge
,
754 ep
->rep_attr
.cap
.max_recv_sge
);
756 /* set trigger for requesting send completion */
757 ep
->rep_cqinit
= ep
->rep_attr
.cap
.max_send_wr
/2 - 1;
758 if (ep
->rep_cqinit
> RPCRDMA_MAX_UNSIGNALED_SENDS
)
759 ep
->rep_cqinit
= RPCRDMA_MAX_UNSIGNALED_SENDS
;
760 else if (ep
->rep_cqinit
<= 2)
763 init_waitqueue_head(&ep
->rep_connect_wait
);
764 INIT_DELAYED_WORK(&ep
->rep_connect_worker
, rpcrdma_connect_worker
);
766 sendcq
= ib_create_cq(ia
->ri_device
, rpcrdma_sendcq_upcall
,
767 rpcrdma_cq_async_error_upcall
, ep
,
768 ep
->rep_attr
.cap
.max_send_wr
+ 1, 0);
769 if (IS_ERR(sendcq
)) {
770 rc
= PTR_ERR(sendcq
);
771 dprintk("RPC: %s: failed to create send CQ: %i\n",
776 rc
= ib_req_notify_cq(sendcq
, IB_CQ_NEXT_COMP
);
778 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
783 recvcq
= ib_create_cq(ia
->ri_device
, rpcrdma_recvcq_upcall
,
784 rpcrdma_cq_async_error_upcall
, ep
,
785 ep
->rep_attr
.cap
.max_recv_wr
+ 1, 0);
786 if (IS_ERR(recvcq
)) {
787 rc
= PTR_ERR(recvcq
);
788 dprintk("RPC: %s: failed to create recv CQ: %i\n",
793 rc
= ib_req_notify_cq(recvcq
, IB_CQ_NEXT_COMP
);
795 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
797 ib_destroy_cq(recvcq
);
801 ep
->rep_attr
.send_cq
= sendcq
;
802 ep
->rep_attr
.recv_cq
= recvcq
;
804 /* Initialize cma parameters */
806 /* RPC/RDMA does not use private data */
807 ep
->rep_remote_cma
.private_data
= NULL
;
808 ep
->rep_remote_cma
.private_data_len
= 0;
810 /* Client offers RDMA Read but does not initiate */
811 ep
->rep_remote_cma
.initiator_depth
= 0;
812 if (devattr
->max_qp_rd_atom
> 32) /* arbitrary but <= 255 */
813 ep
->rep_remote_cma
.responder_resources
= 32;
815 ep
->rep_remote_cma
.responder_resources
=
816 devattr
->max_qp_rd_atom
;
818 ep
->rep_remote_cma
.retry_count
= 7;
819 ep
->rep_remote_cma
.flow_control
= 0;
820 ep
->rep_remote_cma
.rnr_retry_count
= 0;
825 err
= ib_destroy_cq(sendcq
);
827 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
830 rpcrdma_free_regbuf(ia
, ep
->rep_padbuf
);
837 * Disconnect and destroy endpoint. After this, the only
838 * valid operations on the ep are to free it (if dynamically
839 * allocated) or re-create it.
842 rpcrdma_ep_destroy(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
846 dprintk("RPC: %s: entering, connected is %d\n",
847 __func__
, ep
->rep_connected
);
849 cancel_delayed_work_sync(&ep
->rep_connect_worker
);
852 rpcrdma_ep_disconnect(ep
, ia
);
853 rdma_destroy_qp(ia
->ri_id
);
854 ia
->ri_id
->qp
= NULL
;
857 rpcrdma_free_regbuf(ia
, ep
->rep_padbuf
);
859 rpcrdma_clean_cq(ep
->rep_attr
.recv_cq
);
860 rc
= ib_destroy_cq(ep
->rep_attr
.recv_cq
);
862 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
865 rpcrdma_clean_cq(ep
->rep_attr
.send_cq
);
866 rc
= ib_destroy_cq(ep
->rep_attr
.send_cq
);
868 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
873 * Connect unconnected endpoint.
876 rpcrdma_ep_connect(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
878 struct rdma_cm_id
*id
, *old
;
882 if (ep
->rep_connected
!= 0) {
883 struct rpcrdma_xprt
*xprt
;
885 dprintk("RPC: %s: reconnecting...\n", __func__
);
887 rpcrdma_ep_disconnect(ep
, ia
);
888 rpcrdma_flush_cqs(ep
);
890 xprt
= container_of(ia
, struct rpcrdma_xprt
, rx_ia
);
891 id
= rpcrdma_create_id(xprt
, ia
,
892 (struct sockaddr
*)&xprt
->rx_data
.addr
);
897 /* TEMP TEMP TEMP - fail if new device:
898 * Deregister/remarshal *all* requests!
899 * Close and recreate adapter, pd, etc!
900 * Re-determine all attributes still sane!
901 * More stuff I haven't thought of!
904 if (ia
->ri_device
!= id
->device
) {
905 printk("RPC: %s: can't reconnect on "
906 "different device!\n", __func__
);
912 rc
= rdma_create_qp(id
, ia
->ri_pd
, &ep
->rep_attr
);
914 dprintk("RPC: %s: rdma_create_qp failed %i\n",
921 write_lock(&ia
->ri_qplock
);
924 write_unlock(&ia
->ri_qplock
);
926 rdma_destroy_qp(old
);
927 rdma_destroy_id(old
);
929 dprintk("RPC: %s: connecting...\n", __func__
);
930 rc
= rdma_create_qp(ia
->ri_id
, ia
->ri_pd
, &ep
->rep_attr
);
932 dprintk("RPC: %s: rdma_create_qp failed %i\n",
934 /* do not update ep->rep_connected */
939 ep
->rep_connected
= 0;
941 rc
= rdma_connect(ia
->ri_id
, &ep
->rep_remote_cma
);
943 dprintk("RPC: %s: rdma_connect() failed with %i\n",
948 wait_event_interruptible(ep
->rep_connect_wait
, ep
->rep_connected
!= 0);
951 * Check state. A non-peer reject indicates no listener
952 * (ECONNREFUSED), which may be a transient state. All
953 * others indicate a transport condition which has already
954 * undergone a best-effort.
956 if (ep
->rep_connected
== -ECONNREFUSED
&&
957 ++retry_count
<= RDMA_CONNECT_RETRY_MAX
) {
958 dprintk("RPC: %s: non-peer_reject, retry\n", __func__
);
961 if (ep
->rep_connected
<= 0) {
962 /* Sometimes, the only way to reliably connect to remote
963 * CMs is to use same nonzero values for ORD and IRD. */
964 if (retry_count
++ <= RDMA_CONNECT_RETRY_MAX
+ 1 &&
965 (ep
->rep_remote_cma
.responder_resources
== 0 ||
966 ep
->rep_remote_cma
.initiator_depth
!=
967 ep
->rep_remote_cma
.responder_resources
)) {
968 if (ep
->rep_remote_cma
.responder_resources
== 0)
969 ep
->rep_remote_cma
.responder_resources
= 1;
970 ep
->rep_remote_cma
.initiator_depth
=
971 ep
->rep_remote_cma
.responder_resources
;
974 rc
= ep
->rep_connected
;
976 dprintk("RPC: %s: connected\n", __func__
);
981 ep
->rep_connected
= rc
;
986 * rpcrdma_ep_disconnect
988 * This is separate from destroy to facilitate the ability
989 * to reconnect without recreating the endpoint.
991 * This call is not reentrant, and must not be made in parallel
992 * on the same endpoint.
995 rpcrdma_ep_disconnect(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
999 rpcrdma_flush_cqs(ep
);
1000 rc
= rdma_disconnect(ia
->ri_id
);
1002 /* returns without wait if not connected */
1003 wait_event_interruptible(ep
->rep_connect_wait
,
1004 ep
->rep_connected
!= 1);
1005 dprintk("RPC: %s: after wait, %sconnected\n", __func__
,
1006 (ep
->rep_connected
== 1) ? "still " : "dis");
1008 dprintk("RPC: %s: rdma_disconnect %i\n", __func__
, rc
);
1009 ep
->rep_connected
= rc
;
1013 static struct rpcrdma_req
*
1014 rpcrdma_create_req(struct rpcrdma_xprt
*r_xprt
)
1016 struct rpcrdma_req
*req
;
1018 req
= kzalloc(sizeof(*req
), GFP_KERNEL
);
1020 return ERR_PTR(-ENOMEM
);
1022 req
->rl_buffer
= &r_xprt
->rx_buf
;
1026 static struct rpcrdma_rep
*
1027 rpcrdma_create_rep(struct rpcrdma_xprt
*r_xprt
)
1029 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
1030 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
1031 struct rpcrdma_rep
*rep
;
1035 rep
= kzalloc(sizeof(*rep
), GFP_KERNEL
);
1039 rep
->rr_rdmabuf
= rpcrdma_alloc_regbuf(ia
, cdata
->inline_rsize
,
1041 if (IS_ERR(rep
->rr_rdmabuf
)) {
1042 rc
= PTR_ERR(rep
->rr_rdmabuf
);
1046 rep
->rr_device
= ia
->ri_device
;
1047 rep
->rr_rxprt
= r_xprt
;
1057 rpcrdma_buffer_create(struct rpcrdma_xprt
*r_xprt
)
1059 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1060 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
1061 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
1066 buf
->rb_max_requests
= cdata
->max_requests
;
1067 spin_lock_init(&buf
->rb_lock
);
1069 /* Need to allocate:
1070 * 1. arrays for send and recv pointers
1071 * 2. arrays of struct rpcrdma_req to fill in pointers
1072 * 3. array of struct rpcrdma_rep for replies
1073 * Send/recv buffers in req/rep need to be registered
1075 len
= buf
->rb_max_requests
*
1076 (sizeof(struct rpcrdma_req
*) + sizeof(struct rpcrdma_rep
*));
1078 p
= kzalloc(len
, GFP_KERNEL
);
1080 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1085 buf
->rb_pool
= p
; /* for freeing it later */
1087 buf
->rb_send_bufs
= (struct rpcrdma_req
**) p
;
1088 p
= (char *) &buf
->rb_send_bufs
[buf
->rb_max_requests
];
1089 buf
->rb_recv_bufs
= (struct rpcrdma_rep
**) p
;
1090 p
= (char *) &buf
->rb_recv_bufs
[buf
->rb_max_requests
];
1092 rc
= ia
->ri_ops
->ro_init(r_xprt
);
1096 for (i
= 0; i
< buf
->rb_max_requests
; i
++) {
1097 struct rpcrdma_req
*req
;
1098 struct rpcrdma_rep
*rep
;
1100 req
= rpcrdma_create_req(r_xprt
);
1102 dprintk("RPC: %s: request buffer %d alloc"
1103 " failed\n", __func__
, i
);
1107 buf
->rb_send_bufs
[i
] = req
;
1109 rep
= rpcrdma_create_rep(r_xprt
);
1111 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1116 buf
->rb_recv_bufs
[i
] = rep
;
1121 rpcrdma_buffer_destroy(buf
);
1126 rpcrdma_destroy_rep(struct rpcrdma_ia
*ia
, struct rpcrdma_rep
*rep
)
1131 rpcrdma_free_regbuf(ia
, rep
->rr_rdmabuf
);
1136 rpcrdma_destroy_req(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
)
1141 rpcrdma_free_regbuf(ia
, req
->rl_sendbuf
);
1142 rpcrdma_free_regbuf(ia
, req
->rl_rdmabuf
);
1147 rpcrdma_buffer_destroy(struct rpcrdma_buffer
*buf
)
1149 struct rpcrdma_ia
*ia
= rdmab_to_ia(buf
);
1152 /* clean up in reverse order from create
1153 * 1. recv mr memory (mr free, then kfree)
1154 * 2. send mr memory (mr free, then kfree)
1157 dprintk("RPC: %s: entering\n", __func__
);
1159 for (i
= 0; i
< buf
->rb_max_requests
; i
++) {
1160 if (buf
->rb_recv_bufs
)
1161 rpcrdma_destroy_rep(ia
, buf
->rb_recv_bufs
[i
]);
1162 if (buf
->rb_send_bufs
)
1163 rpcrdma_destroy_req(ia
, buf
->rb_send_bufs
[i
]);
1166 ia
->ri_ops
->ro_destroy(buf
);
1168 kfree(buf
->rb_pool
);
1172 rpcrdma_get_mw(struct rpcrdma_xprt
*r_xprt
)
1174 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1175 struct rpcrdma_mw
*mw
= NULL
;
1176 unsigned long flags
;
1178 spin_lock_irqsave(&buf
->rb_lock
, flags
);
1179 if (!list_empty(&buf
->rb_mws
)) {
1180 mw
= list_first_entry(&buf
->rb_mws
,
1181 struct rpcrdma_mw
, mw_list
);
1182 list_del_init(&mw
->mw_list
);
1184 spin_unlock_irqrestore(&buf
->rb_lock
, flags
);
1187 pr_err("RPC: %s: no MWs available\n", __func__
);
1192 rpcrdma_put_mw(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_mw
*mw
)
1194 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1195 unsigned long flags
;
1197 spin_lock_irqsave(&buf
->rb_lock
, flags
);
1198 list_add_tail(&mw
->mw_list
, &buf
->rb_mws
);
1199 spin_unlock_irqrestore(&buf
->rb_lock
, flags
);
1203 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req
*req
, struct rpcrdma_buffer
*buf
)
1205 buf
->rb_send_bufs
[--buf
->rb_send_index
] = req
;
1207 if (req
->rl_reply
) {
1208 buf
->rb_recv_bufs
[--buf
->rb_recv_index
] = req
->rl_reply
;
1209 req
->rl_reply
= NULL
;
1214 * Get a set of request/reply buffers.
1216 * Reply buffer (if needed) is attached to send buffer upon return.
1218 * rb_send_index and rb_recv_index MUST always be pointing to the
1219 * *next* available buffer (non-NULL). They are incremented after
1220 * removing buffers, and decremented *before* returning them.
1222 struct rpcrdma_req
*
1223 rpcrdma_buffer_get(struct rpcrdma_buffer
*buffers
)
1225 struct rpcrdma_req
*req
;
1226 unsigned long flags
;
1228 spin_lock_irqsave(&buffers
->rb_lock
, flags
);
1230 if (buffers
->rb_send_index
== buffers
->rb_max_requests
) {
1231 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1232 dprintk("RPC: %s: out of request buffers\n", __func__
);
1233 return ((struct rpcrdma_req
*)NULL
);
1236 req
= buffers
->rb_send_bufs
[buffers
->rb_send_index
];
1237 if (buffers
->rb_send_index
< buffers
->rb_recv_index
) {
1238 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1240 buffers
->rb_recv_index
- buffers
->rb_send_index
);
1241 req
->rl_reply
= NULL
;
1243 req
->rl_reply
= buffers
->rb_recv_bufs
[buffers
->rb_recv_index
];
1244 buffers
->rb_recv_bufs
[buffers
->rb_recv_index
++] = NULL
;
1246 buffers
->rb_send_bufs
[buffers
->rb_send_index
++] = NULL
;
1248 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1253 * Put request/reply buffers back into pool.
1254 * Pre-decrement counter/array index.
1257 rpcrdma_buffer_put(struct rpcrdma_req
*req
)
1259 struct rpcrdma_buffer
*buffers
= req
->rl_buffer
;
1260 unsigned long flags
;
1262 spin_lock_irqsave(&buffers
->rb_lock
, flags
);
1263 rpcrdma_buffer_put_sendbuf(req
, buffers
);
1264 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1268 * Recover reply buffers from pool.
1269 * This happens when recovering from error conditions.
1270 * Post-increment counter/array index.
1273 rpcrdma_recv_buffer_get(struct rpcrdma_req
*req
)
1275 struct rpcrdma_buffer
*buffers
= req
->rl_buffer
;
1276 unsigned long flags
;
1278 spin_lock_irqsave(&buffers
->rb_lock
, flags
);
1279 if (buffers
->rb_recv_index
< buffers
->rb_max_requests
) {
1280 req
->rl_reply
= buffers
->rb_recv_bufs
[buffers
->rb_recv_index
];
1281 buffers
->rb_recv_bufs
[buffers
->rb_recv_index
++] = NULL
;
1283 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1287 * Put reply buffers back into pool when not attached to
1288 * request. This happens in error conditions.
1291 rpcrdma_recv_buffer_put(struct rpcrdma_rep
*rep
)
1293 struct rpcrdma_buffer
*buffers
= &rep
->rr_rxprt
->rx_buf
;
1294 unsigned long flags
;
1296 spin_lock_irqsave(&buffers
->rb_lock
, flags
);
1297 buffers
->rb_recv_bufs
[--buffers
->rb_recv_index
] = rep
;
1298 spin_unlock_irqrestore(&buffers
->rb_lock
, flags
);
1302 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1306 rpcrdma_mapping_error(struct rpcrdma_mr_seg
*seg
)
1308 dprintk("RPC: map_one: offset %p iova %llx len %zu\n",
1310 (unsigned long long)seg
->mr_dma
, seg
->mr_dmalen
);
1314 rpcrdma_register_internal(struct rpcrdma_ia
*ia
, void *va
, int len
,
1315 struct ib_mr
**mrp
, struct ib_sge
*iov
)
1317 struct ib_phys_buf ipb
;
1322 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1324 iov
->addr
= ib_dma_map_single(ia
->ri_device
,
1325 va
, len
, DMA_BIDIRECTIONAL
);
1326 if (ib_dma_mapping_error(ia
->ri_device
, iov
->addr
))
1331 if (ia
->ri_have_dma_lkey
) {
1333 iov
->lkey
= ia
->ri_dma_lkey
;
1335 } else if (ia
->ri_bind_mem
!= NULL
) {
1337 iov
->lkey
= ia
->ri_bind_mem
->lkey
;
1341 ipb
.addr
= iov
->addr
;
1342 ipb
.size
= iov
->length
;
1343 mr
= ib_reg_phys_mr(ia
->ri_pd
, &ipb
, 1,
1344 IB_ACCESS_LOCAL_WRITE
, &iov
->addr
);
1346 dprintk("RPC: %s: phys convert: 0x%llx "
1347 "registered 0x%llx length %d\n",
1348 __func__
, (unsigned long long)ipb
.addr
,
1349 (unsigned long long)iov
->addr
, len
);
1354 dprintk("RPC: %s: failed with %i\n", __func__
, rc
);
1357 iov
->lkey
= mr
->lkey
;
1365 rpcrdma_deregister_internal(struct rpcrdma_ia
*ia
,
1366 struct ib_mr
*mr
, struct ib_sge
*iov
)
1370 ib_dma_unmap_single(ia
->ri_device
,
1371 iov
->addr
, iov
->length
, DMA_BIDIRECTIONAL
);
1376 rc
= ib_dereg_mr(mr
);
1378 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__
, rc
);
1383 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1384 * @ia: controlling rpcrdma_ia
1385 * @size: size of buffer to be allocated, in bytes
1388 * Returns pointer to private header of an area of internally
1389 * registered memory, or an ERR_PTR. The registered buffer follows
1390 * the end of the private header.
1392 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1393 * receiving the payload of RDMA RECV operations. regbufs are not
1394 * used for RDMA READ/WRITE operations, thus are registered only for
1397 struct rpcrdma_regbuf
*
1398 rpcrdma_alloc_regbuf(struct rpcrdma_ia
*ia
, size_t size
, gfp_t flags
)
1400 struct rpcrdma_regbuf
*rb
;
1404 rb
= kmalloc(sizeof(*rb
) + size
, flags
);
1409 rb
->rg_owner
= NULL
;
1410 rc
= rpcrdma_register_internal(ia
, rb
->rg_base
, size
,
1411 &rb
->rg_mr
, &rb
->rg_iov
);
1424 * rpcrdma_free_regbuf - deregister and free registered buffer
1425 * @ia: controlling rpcrdma_ia
1426 * @rb: regbuf to be deregistered and freed
1429 rpcrdma_free_regbuf(struct rpcrdma_ia
*ia
, struct rpcrdma_regbuf
*rb
)
1432 rpcrdma_deregister_internal(ia
, rb
->rg_mr
, &rb
->rg_iov
);
1438 * Prepost any receive buffer, then post send.
1440 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1443 rpcrdma_ep_post(struct rpcrdma_ia
*ia
,
1444 struct rpcrdma_ep
*ep
,
1445 struct rpcrdma_req
*req
)
1447 struct ib_send_wr send_wr
, *send_wr_fail
;
1448 struct rpcrdma_rep
*rep
= req
->rl_reply
;
1452 rc
= rpcrdma_ep_post_recv(ia
, ep
, rep
);
1455 req
->rl_reply
= NULL
;
1458 send_wr
.next
= NULL
;
1459 send_wr
.wr_id
= RPCRDMA_IGNORE_COMPLETION
;
1460 send_wr
.sg_list
= req
->rl_send_iov
;
1461 send_wr
.num_sge
= req
->rl_niovs
;
1462 send_wr
.opcode
= IB_WR_SEND
;
1463 if (send_wr
.num_sge
== 4) /* no need to sync any pad (constant) */
1464 ib_dma_sync_single_for_device(ia
->ri_device
,
1465 req
->rl_send_iov
[3].addr
,
1466 req
->rl_send_iov
[3].length
,
1468 ib_dma_sync_single_for_device(ia
->ri_device
,
1469 req
->rl_send_iov
[1].addr
,
1470 req
->rl_send_iov
[1].length
,
1472 ib_dma_sync_single_for_device(ia
->ri_device
,
1473 req
->rl_send_iov
[0].addr
,
1474 req
->rl_send_iov
[0].length
,
1477 if (DECR_CQCOUNT(ep
) > 0)
1478 send_wr
.send_flags
= 0;
1479 else { /* Provider must take a send completion every now and then */
1481 send_wr
.send_flags
= IB_SEND_SIGNALED
;
1484 rc
= ib_post_send(ia
->ri_id
->qp
, &send_wr
, &send_wr_fail
);
1486 dprintk("RPC: %s: ib_post_send returned %i\n", __func__
,
1493 * (Re)post a receive buffer.
1496 rpcrdma_ep_post_recv(struct rpcrdma_ia
*ia
,
1497 struct rpcrdma_ep
*ep
,
1498 struct rpcrdma_rep
*rep
)
1500 struct ib_recv_wr recv_wr
, *recv_wr_fail
;
1503 recv_wr
.next
= NULL
;
1504 recv_wr
.wr_id
= (u64
) (unsigned long) rep
;
1505 recv_wr
.sg_list
= &rep
->rr_rdmabuf
->rg_iov
;
1506 recv_wr
.num_sge
= 1;
1508 ib_dma_sync_single_for_cpu(ia
->ri_device
,
1509 rdmab_addr(rep
->rr_rdmabuf
),
1510 rdmab_length(rep
->rr_rdmabuf
),
1513 rc
= ib_post_recv(ia
->ri_id
->qp
, &recv_wr
, &recv_wr_fail
);
1516 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__
,
1521 /* How many chunk list items fit within our inline buffers?
1524 rpcrdma_max_segments(struct rpcrdma_xprt
*r_xprt
)
1526 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
1527 int bytes
, segments
;
1529 bytes
= min_t(unsigned int, cdata
->inline_wsize
, cdata
->inline_rsize
);
1530 bytes
-= RPCRDMA_HDRLEN_MIN
;
1531 if (bytes
< sizeof(struct rpcrdma_segment
) * 2) {
1532 pr_warn("RPC: %s: inline threshold too small\n",
1537 segments
= 1 << (fls(bytes
/ sizeof(struct rpcrdma_segment
)) - 1);
1538 dprintk("RPC: %s: max chunk list size = %d segments\n",
1539 __func__
, segments
);