/*
* The RDMA allocate/free functions need the task structure as a place
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence. For this reason, the recv buffers are attached to send
- * buffers for portions of the RPC. Note that the RPC layer allocates
- * both send and receive buffers in the same call. We may register
- * the receive buffer portion when using reply chunks.
+ * sequence.
+ *
+ * The RPC layer allocates both send and receive buffers in the same call
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
+ * We may register rq_rcv_buf when using reply chunks.
*/
static void *
xprt_rdma_allocate(struct rpc_task *task, size_t size)
{
struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
- struct rpcrdma_req *req, *nreq;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_regbuf *rb;
+ struct rpcrdma_req *req;
+ size_t min_size;
+ gfp_t flags = task->tk_flags & RPC_TASK_SWAPPER ?
+ GFP_ATOMIC : GFP_NOFS;
- req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+ req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (req == NULL)
return NULL;
- if (size > req->rl_size) {
- dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
- "prog %d vers %d proc %d\n",
- __func__, size, req->rl_size,
- task->tk_client->cl_prog, task->tk_client->cl_vers,
- task->tk_msg.rpc_proc->p_proc);
- /*
- * Outgoing length shortage. Our inline write max must have
- * been configured to perform direct i/o.
- *
- * This is therefore a large metadata operation, and the
- * allocate call was made on the maximum possible message,
- * e.g. containing long filename(s) or symlink data. In
- * fact, while these metadata operations *might* carry
- * large outgoing payloads, they rarely *do*. However, we
- * have to commit to the request here, so reallocate and
- * register it now. The data path will never require this
- * reallocation.
- *
- * If the allocation or registration fails, the RPC framework
- * will (doggedly) retry.
- */
- if (task->tk_flags & RPC_TASK_SWAPPER)
- nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
- else
- nreq = kmalloc(sizeof *req + size, GFP_NOFS);
- if (nreq == NULL)
- goto outfail;
-
- if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
- nreq->rl_base, size + sizeof(struct rpcrdma_req)
- - offsetof(struct rpcrdma_req, rl_base),
- &nreq->rl_handle, &nreq->rl_iov)) {
- kfree(nreq);
- goto outfail;
- }
- rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
- nreq->rl_size = size;
- nreq->rl_niovs = 0;
- nreq->rl_nchunks = 0;
- nreq->rl_buffer = (struct rpcrdma_buffer *)req;
- nreq->rl_reply = req->rl_reply;
- memcpy(nreq->rl_segments,
- req->rl_segments, sizeof nreq->rl_segments);
- /* flag the swap with an unused field */
- nreq->rl_iov.length = 0;
- req->rl_reply = NULL;
- req = nreq;
- }
+ if (req->rl_sendbuf == NULL)
+ goto out_sendbuf;
+ if (size > req->rl_sendbuf->rg_size)
+ goto out_sendbuf;
+
+out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
- return req->rl_xdr_buf;
-
-outfail:
+ return req->rl_sendbuf->rg_base;
+
+out_sendbuf:
+ /* XDR encoding and RPC/RDMA marshaling of this request has not
+ * yet occurred. Thus a lower bound is needed to prevent buffer
+ * overrun during marshaling.
+ *
+ * RPC/RDMA marshaling may choose to send payload bearing ops
+ * inline, if the result is smaller than the inline threshold.
+ * The value of the "size" argument accounts for header
+ * requirements but not for the payload in these cases.
+ *
+ * Likewise, allocate enough space to receive a reply up to the
+ * size of the inline threshold.
+ *
+ * It's unlikely that both the send header and the received
+ * reply will be large, but slush is provided here to allow
+ * flexibility when marshaling.
+ */
+ min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
+ min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ if (size < min_size)
+ size = min_size;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ goto out_fail;
+ rb->rg_owner = req;
+
+ r_xprt->rx_stats.hardway_register_count += size;
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ req->rl_sendbuf = rb;
+ goto out;
+
+out_fail:
rpcrdma_buffer_put(req);
- rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+ r_xprt->rx_stats.failed_marshal_count++;
return NULL;
}
{
struct rpcrdma_req *req;
struct rpcrdma_xprt *r_xprt;
- struct rpcrdma_rep *rep;
+ struct rpcrdma_regbuf *rb;
int i;
if (buffer == NULL)
return;
- req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
- if (req->rl_iov.length == 0) { /* see allocate above */
- r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
- struct rpcrdma_xprt, rx_buf);
- } else
- r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
- rep = req->rl_reply;
+ rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
+ req = rb->rg_owner;
+ r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
- dprintk("RPC: %s: called on 0x%p%s\n",
- __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+ dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
- /*
- * Finish the deregistration. The process is considered
- * complete when the rr_func vector becomes NULL - this
- * was put in place during rpcrdma_reply_handler() - the wait
- * call below will not block if the dereg is "done". If
- * interrupted, our framework will clean up.
- */
for (i = 0; req->rl_nchunks;) {
--req->rl_nchunks;
i += rpcrdma_deregister_external(
&req->rl_segments[i], r_xprt);
}
- if (req->rl_iov.length == 0) { /* see allocate above */
- struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
- oreq->rl_reply = req->rl_reply;
- (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
- req->rl_handle,
- &req->rl_iov);
- kfree(req);
- req = oreq;
- }
-
- /* Put back request+reply buffers */
rpcrdma_buffer_put(req);
}
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- size_t wlen = 1 << fls(cdata->inline_wsize +
- sizeof(struct rpcrdma_req));
+ size_t wlen = cdata->inline_wsize;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_req *req;
int rc;
rc = -ENOMEM;
- req = kmalloc(wlen, GFP_KERNEL);
+ req = kmalloc(sizeof(*req) + wlen, GFP_KERNEL);
if (req == NULL)
goto out;
- memset(req, 0, sizeof(struct rpcrdma_req));
+ memset(req, 0, sizeof(*req));
- rc = rpcrdma_register_internal(ia, req->rl_base, wlen -
- offsetof(struct rpcrdma_req, rl_base),
+ rc = rpcrdma_register_internal(ia, req->rl_base, wlen,
&req->rl_handle, &req->rl_iov);
if (rc)
goto out_free;
- req->rl_size = wlen - sizeof(struct rpcrdma_req);
req->rl_buffer = &r_xprt->rx_buf;
return req;
rep = kmalloc(rlen, GFP_KERNEL);
if (rep == NULL)
goto out;
- memset(rep, 0, sizeof(struct rpcrdma_rep));
+ memset(rep, 0, sizeof(*rep));
rc = rpcrdma_register_internal(ia, rep->rr_base, rlen -
offsetof(struct rpcrdma_rep, rr_base),
if (!req)
return;
+ rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_deregister_internal(ia, req->rl_handle, &req->rl_iov);
kfree(req);
}
struct rpcrdma_buffer *buffers = req->rl_buffer;
unsigned long flags;
- if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
- buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
spin_lock_irqsave(&buffers->rb_lock, flags);
if (buffers->rb_recv_index < buffers->rb_max_requests) {
req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];