2 * Copyright (c) 2015 Oracle. All rights reserved.
4 * Support for backward direction RPCs on RPC/RDMA (server-side).
7 #include <linux/sunrpc/svc_rdma.h>
10 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
12 #undef SVCRDMA_BACKCHANNEL_DEBUG
14 int svc_rdma_handle_bc_reply(struct rpc_xprt
*xprt
, struct rpcrdma_msg
*rmsgp
,
15 struct xdr_buf
*rcvbuf
)
17 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
18 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
27 p
= (__be32
*)src
->iov_base
;
31 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
32 pr_info("%s: xid=%08x, length=%zu\n",
33 __func__
, be32_to_cpu(xid
), len
);
34 pr_info("%s: RPC/RDMA: %*ph\n",
35 __func__
, (int)RPCRDMA_HDRLEN_MIN
, rmsgp
);
36 pr_info("%s: RPC: %*ph\n",
37 __func__
, (int)len
, p
);
41 if (src
->iov_len
< 24)
44 spin_lock_bh(&xprt
->transport_lock
);
45 req
= xprt_lookup_rqst(xprt
, xid
);
49 dst
= &req
->rq_private_buf
.head
[0];
50 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
51 if (dst
->iov_len
< len
)
53 memcpy(dst
->iov_base
, p
, len
);
55 credits
= be32_to_cpu(rmsgp
->rm_credit
);
57 credits
= 1; /* don't deadlock */
58 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
59 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
62 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
63 if (xprt
->cwnd
> cwnd
)
64 xprt_release_rqst_cong(req
->rq_task
);
67 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
71 spin_unlock_bh(&xprt
->transport_lock
);
76 dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
81 dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
82 xprt
, be32_to_cpu(xid
));
87 /* Send a backwards direction RPC call.
89 * Caller holds the connection's mutex and has already marshaled
90 * the RPC/RDMA request.
92 * This is similar to svc_rdma_reply, but takes an rpc_rqst
93 * instead, does not support chunks, and avoids blocking memory
96 * XXX: There is still an opportunity to block in svc_rdma_send()
97 * if there are no SQ entries to post the Send. This may occur if
98 * the adapter has a small maximum SQ depth.
100 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
101 struct rpc_rqst
*rqst
)
103 struct xdr_buf
*sndbuf
= &rqst
->rq_snd_buf
;
104 struct svc_rdma_op_ctxt
*ctxt
;
105 struct svc_rdma_req_map
*vec
;
106 struct ib_send_wr send_wr
;
109 vec
= svc_rdma_get_req_map(rdma
);
110 ret
= svc_rdma_map_xdr(rdma
, sndbuf
, vec
);
114 /* Post a recv buffer to handle the reply for this request. */
115 ret
= svc_rdma_post_recv(rdma
, GFP_NOIO
);
117 pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
119 pr_err("svcrdma: closing transport %p.\n", rdma
);
120 set_bit(XPT_CLOSE
, &rdma
->sc_xprt
.xpt_flags
);
125 ctxt
= svc_rdma_get_context(rdma
);
126 ctxt
->pages
[0] = virt_to_page(rqst
->rq_buffer
);
129 ctxt
->wr_op
= IB_WR_SEND
;
130 ctxt
->direction
= DMA_TO_DEVICE
;
131 ctxt
->sge
[0].lkey
= rdma
->sc_pd
->local_dma_lkey
;
132 ctxt
->sge
[0].length
= sndbuf
->len
;
134 ib_dma_map_page(rdma
->sc_cm_id
->device
, ctxt
->pages
[0], 0,
135 sndbuf
->len
, DMA_TO_DEVICE
);
136 if (ib_dma_mapping_error(rdma
->sc_cm_id
->device
, ctxt
->sge
[0].addr
)) {
140 atomic_inc(&rdma
->sc_dma_used
);
142 memset(&send_wr
, 0, sizeof(send_wr
));
143 send_wr
.wr_id
= (unsigned long)ctxt
;
144 send_wr
.sg_list
= ctxt
->sge
;
146 send_wr
.opcode
= IB_WR_SEND
;
147 send_wr
.send_flags
= IB_SEND_SIGNALED
;
149 ret
= svc_rdma_send(rdma
, &send_wr
);
156 svc_rdma_put_req_map(rdma
, vec
);
157 dprintk("svcrdma: %s returns %d\n", __func__
, ret
);
161 svc_rdma_unmap_dma(ctxt
);
162 svc_rdma_put_context(ctxt
, 1);
166 /* Server-side transport endpoint wants a whole page for its send
167 * buffer. The client RPC code constructs the RPC header in this
168 * buffer before it invokes ->send_request.
170 * Returns NULL if there was a temporary allocation failure.
173 xprt_rdma_bc_allocate(struct rpc_task
*task
, size_t size
)
175 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
176 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
177 struct svcxprt_rdma
*rdma
;
180 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
182 /* Prevent an infinite loop: try to make this case work */
183 if (size
> PAGE_SIZE
)
184 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
187 page
= alloc_page(RPCRDMA_DEF_GFP
);
191 return page_address(page
);
195 xprt_rdma_bc_free(void *buffer
)
197 /* No-op: ctxt and page have already been freed. */
201 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
203 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
204 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
205 struct rpcrdma_msg
*headerp
= (struct rpcrdma_msg
*)rqst
->rq_buffer
;
208 /* Space in the send buffer for an RPC/RDMA header is reserved
209 * via xprt->tsh_size.
211 headerp
->rm_xid
= rqst
->rq_xid
;
212 headerp
->rm_vers
= rpcrdma_version
;
213 headerp
->rm_credit
= cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
214 headerp
->rm_type
= rdma_msg
;
215 headerp
->rm_body
.rm_chunks
[0] = xdr_zero
;
216 headerp
->rm_body
.rm_chunks
[1] = xdr_zero
;
217 headerp
->rm_body
.rm_chunks
[2] = xdr_zero
;
219 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
220 pr_info("%s: %*ph\n", __func__
, 64, rqst
->rq_buffer
);
223 rc
= svc_rdma_bc_sendto(rdma
, rqst
);
225 goto drop_connection
;
229 dprintk("svcrdma: failed to send bc call\n");
230 xprt_disconnect_done(xprt
);
234 /* Send an RPC call on the passive end of a transport
238 xprt_rdma_bc_send_request(struct rpc_task
*task
)
240 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
241 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
242 struct svcxprt_rdma
*rdma
;
245 dprintk("svcrdma: sending bc call with xid: %08x\n",
246 be32_to_cpu(rqst
->rq_xid
));
248 if (!mutex_trylock(&sxprt
->xpt_mutex
)) {
249 rpc_sleep_on(&sxprt
->xpt_bc_pending
, task
, NULL
);
250 if (!mutex_trylock(&sxprt
->xpt_mutex
))
252 rpc_wake_up_queued_task(&sxprt
->xpt_bc_pending
, task
);
256 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
257 if (!test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
258 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
260 mutex_unlock(&sxprt
->xpt_mutex
);
268 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
270 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
274 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
276 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
279 module_put(THIS_MODULE
);
282 static struct rpc_xprt_ops xprt_rdma_bc_procs
= {
283 .reserve_xprt
= xprt_reserve_xprt_cong
,
284 .release_xprt
= xprt_release_xprt_cong
,
285 .alloc_slot
= xprt_alloc_slot
,
286 .release_request
= xprt_release_rqst_cong
,
287 .buf_alloc
= xprt_rdma_bc_allocate
,
288 .buf_free
= xprt_rdma_bc_free
,
289 .send_request
= xprt_rdma_bc_send_request
,
290 .set_retrans_timeout
= xprt_set_retrans_timeout_def
,
291 .close
= xprt_rdma_bc_close
,
292 .destroy
= xprt_rdma_bc_put
,
293 .print_stats
= xprt_rdma_print_stats
296 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
297 .to_initval
= 60 * HZ
,
298 .to_maxval
= 60 * HZ
,
301 /* It shouldn't matter if the number of backchannel session slots
302 * doesn't match the number of RPC/RDMA credits. That just means
303 * one or the other will have extra slots that aren't used.
305 static struct rpc_xprt
*
306 xprt_setup_rdma_bc(struct xprt_create
*args
)
308 struct rpc_xprt
*xprt
;
309 struct rpcrdma_xprt
*new_xprt
;
311 if (args
->addrlen
> sizeof(xprt
->addr
)) {
312 dprintk("RPC: %s: address too large\n", __func__
);
313 return ERR_PTR(-EBADF
);
316 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
317 RPCRDMA_MAX_BC_REQUESTS
,
318 RPCRDMA_MAX_BC_REQUESTS
);
320 dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
322 return ERR_PTR(-ENOMEM
);
325 xprt
->timeout
= &xprt_rdma_bc_timeout
;
326 xprt_set_bound(xprt
);
327 xprt_set_connected(xprt
);
328 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
329 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
330 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
332 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
333 xprt
->tsh_size
= RPCRDMA_HDRLEN_MIN
/ sizeof(__be32
);
334 xprt
->ops
= &xprt_rdma_bc_procs
;
336 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
337 xprt
->addrlen
= args
->addrlen
;
338 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
341 xprt
->max_payload
= xprt_rdma_max_inline_read
;
343 new_xprt
= rpcx_to_rdmax(xprt
);
344 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
347 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
348 xprt
->bc_xprt
= args
->bc_xprt
;
350 if (!try_module_get(THIS_MODULE
))
353 /* Final put for backchannel xprt is in __svc_rdma_free */
358 xprt_rdma_free_addresses(xprt
);
359 args
->bc_xprt
->xpt_bc_xprt
= NULL
;
362 return ERR_PTR(-EINVAL
);
365 struct xprt_class xprt_rdma_bc
= {
366 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
367 .name
= "rdma backchannel",
368 .owner
= THIS_MODULE
,
369 .ident
= XPRT_TRANSPORT_BC_RDMA
,
370 .setup
= xprt_setup_rdma_bc
,