2 #include <linux/errno.h>
3 #include <linux/errqueue.h>
4 #include <linux/file.h>
6 #include <linux/kernel.h>
7 #include <linux/module.h>
9 #include <linux/netdevice.h>
10 #include <linux/poll.h>
11 #include <linux/rculist.h>
12 #include <linux/skbuff.h>
13 #include <linux/socket.h>
14 #include <linux/uaccess.h>
15 #include <linux/workqueue.h>
17 #include <net/netns/generic.h>
20 #include <uapi/linux/kcm.h>
22 unsigned int kcm_net_id
;
24 static struct kmem_cache
*kcm_psockp __read_mostly
;
25 static struct kmem_cache
*kcm_muxp __read_mostly
;
26 static struct workqueue_struct
*kcm_wq
;
28 static inline struct kcm_sock
*kcm_sk(const struct sock
*sk
)
30 return (struct kcm_sock
*)sk
;
33 static inline struct kcm_tx_msg
*kcm_tx_msg(struct sk_buff
*skb
)
35 return (struct kcm_tx_msg
*)skb
->cb
;
38 static inline struct kcm_rx_msg
*kcm_rx_msg(struct sk_buff
*skb
)
40 return (struct kcm_rx_msg
*)((void *)skb
->cb
+
41 offsetof(struct qdisc_skb_cb
, data
));
44 static void report_csk_error(struct sock
*csk
, int err
)
47 csk
->sk_error_report(csk
);
50 /* Callback lock held */
51 static void kcm_abort_rx_psock(struct kcm_psock
*psock
, int err
,
54 struct sock
*csk
= psock
->sk
;
56 /* Unrecoverable error in receive */
58 if (psock
->rx_stopped
)
61 psock
->rx_stopped
= 1;
62 KCM_STATS_INCR(psock
->stats
.rx_aborts
);
64 /* Report an error on the lower socket */
65 report_csk_error(csk
, err
);
68 static void kcm_abort_tx_psock(struct kcm_psock
*psock
, int err
,
71 struct sock
*csk
= psock
->sk
;
72 struct kcm_mux
*mux
= psock
->mux
;
74 /* Unrecoverable error in transmit */
76 spin_lock_bh(&mux
->lock
);
78 if (psock
->tx_stopped
) {
79 spin_unlock_bh(&mux
->lock
);
83 psock
->tx_stopped
= 1;
84 KCM_STATS_INCR(psock
->stats
.tx_aborts
);
87 /* Take off psocks_avail list */
88 list_del(&psock
->psock_avail_list
);
89 } else if (wakeup_kcm
) {
90 /* In this case psock is being aborted while outside of
91 * write_msgs and psock is reserved. Schedule tx_work
92 * to handle the failure there. Need to commit tx_stopped
93 * before queuing work.
97 queue_work(kcm_wq
, &psock
->tx_kcm
->tx_work
);
100 spin_unlock_bh(&mux
->lock
);
102 /* Report error on lower socket */
103 report_csk_error(csk
, err
);
106 /* RX mux lock held. */
107 static void kcm_update_rx_mux_stats(struct kcm_mux
*mux
,
108 struct kcm_psock
*psock
)
110 KCM_STATS_ADD(mux
->stats
.rx_bytes
,
111 psock
->stats
.rx_bytes
- psock
->saved_rx_bytes
);
112 mux
->stats
.rx_msgs
+=
113 psock
->stats
.rx_msgs
- psock
->saved_rx_msgs
;
114 psock
->saved_rx_msgs
= psock
->stats
.rx_msgs
;
115 psock
->saved_rx_bytes
= psock
->stats
.rx_bytes
;
118 static void kcm_update_tx_mux_stats(struct kcm_mux
*mux
,
119 struct kcm_psock
*psock
)
121 KCM_STATS_ADD(mux
->stats
.tx_bytes
,
122 psock
->stats
.tx_bytes
- psock
->saved_tx_bytes
);
123 mux
->stats
.tx_msgs
+=
124 psock
->stats
.tx_msgs
- psock
->saved_tx_msgs
;
125 psock
->saved_tx_msgs
= psock
->stats
.tx_msgs
;
126 psock
->saved_tx_bytes
= psock
->stats
.tx_bytes
;
129 static int kcm_queue_rcv_skb(struct sock
*sk
, struct sk_buff
*skb
);
131 /* KCM is ready to receive messages on its queue-- either the KCM is new or
132 * has become unblocked after being blocked on full socket buffer. Queue any
133 * pending ready messages on a psock. RX mux lock held.
135 static void kcm_rcv_ready(struct kcm_sock
*kcm
)
137 struct kcm_mux
*mux
= kcm
->mux
;
138 struct kcm_psock
*psock
;
141 if (unlikely(kcm
->rx_wait
|| kcm
->rx_psock
|| kcm
->rx_disabled
))
144 while (unlikely((skb
= __skb_dequeue(&mux
->rx_hold_queue
)))) {
145 if (kcm_queue_rcv_skb(&kcm
->sk
, skb
)) {
146 /* Assuming buffer limit has been reached */
147 skb_queue_head(&mux
->rx_hold_queue
, skb
);
148 WARN_ON(!sk_rmem_alloc_get(&kcm
->sk
));
153 while (!list_empty(&mux
->psocks_ready
)) {
154 psock
= list_first_entry(&mux
->psocks_ready
, struct kcm_psock
,
157 if (kcm_queue_rcv_skb(&kcm
->sk
, psock
->ready_rx_msg
)) {
158 /* Assuming buffer limit has been reached */
159 WARN_ON(!sk_rmem_alloc_get(&kcm
->sk
));
163 /* Consumed the ready message on the psock. Schedule rx_work to
166 list_del(&psock
->psock_ready_list
);
167 psock
->ready_rx_msg
= NULL
;
169 /* Commit clearing of ready_rx_msg for queuing work */
172 queue_work(kcm_wq
, &psock
->rx_work
);
175 /* Buffer limit is okay now, add to ready list */
176 list_add_tail(&kcm
->wait_rx_list
,
177 &kcm
->mux
->kcm_rx_waiters
);
181 static void kcm_rfree(struct sk_buff
*skb
)
183 struct sock
*sk
= skb
->sk
;
184 struct kcm_sock
*kcm
= kcm_sk(sk
);
185 struct kcm_mux
*mux
= kcm
->mux
;
186 unsigned int len
= skb
->truesize
;
188 sk_mem_uncharge(sk
, len
);
189 atomic_sub(len
, &sk
->sk_rmem_alloc
);
191 /* For reading rx_wait and rx_psock without holding lock */
192 smp_mb__after_atomic();
194 if (!kcm
->rx_wait
&& !kcm
->rx_psock
&&
195 sk_rmem_alloc_get(sk
) < sk
->sk_rcvlowat
) {
196 spin_lock_bh(&mux
->rx_lock
);
198 spin_unlock_bh(&mux
->rx_lock
);
202 static int kcm_queue_rcv_skb(struct sock
*sk
, struct sk_buff
*skb
)
204 struct sk_buff_head
*list
= &sk
->sk_receive_queue
;
206 if (atomic_read(&sk
->sk_rmem_alloc
) >= sk
->sk_rcvbuf
)
209 if (!sk_rmem_schedule(sk
, skb
, skb
->truesize
))
216 skb
->destructor
= kcm_rfree
;
217 atomic_add(skb
->truesize
, &sk
->sk_rmem_alloc
);
218 sk_mem_charge(sk
, skb
->truesize
);
220 skb_queue_tail(list
, skb
);
222 if (!sock_flag(sk
, SOCK_DEAD
))
223 sk
->sk_data_ready(sk
);
228 /* Requeue received messages for a kcm socket to other kcm sockets. This is
229 * called with a kcm socket is receive disabled.
232 static void requeue_rx_msgs(struct kcm_mux
*mux
, struct sk_buff_head
*head
)
235 struct kcm_sock
*kcm
;
237 while ((skb
= __skb_dequeue(head
))) {
238 /* Reset destructor to avoid calling kcm_rcv_ready */
239 skb
->destructor
= sock_rfree
;
242 if (list_empty(&mux
->kcm_rx_waiters
)) {
243 skb_queue_tail(&mux
->rx_hold_queue
, skb
);
247 kcm
= list_first_entry(&mux
->kcm_rx_waiters
,
248 struct kcm_sock
, wait_rx_list
);
250 if (kcm_queue_rcv_skb(&kcm
->sk
, skb
)) {
251 /* Should mean socket buffer full */
252 list_del(&kcm
->wait_rx_list
);
253 kcm
->rx_wait
= false;
255 /* Commit rx_wait to read in kcm_free */
263 /* Lower sock lock held */
264 static struct kcm_sock
*reserve_rx_kcm(struct kcm_psock
*psock
,
265 struct sk_buff
*head
)
267 struct kcm_mux
*mux
= psock
->mux
;
268 struct kcm_sock
*kcm
;
270 WARN_ON(psock
->ready_rx_msg
);
273 return psock
->rx_kcm
;
275 spin_lock_bh(&mux
->rx_lock
);
278 spin_unlock_bh(&mux
->rx_lock
);
279 return psock
->rx_kcm
;
282 kcm_update_rx_mux_stats(mux
, psock
);
284 if (list_empty(&mux
->kcm_rx_waiters
)) {
285 psock
->ready_rx_msg
= head
;
286 list_add_tail(&psock
->psock_ready_list
,
288 spin_unlock_bh(&mux
->rx_lock
);
292 kcm
= list_first_entry(&mux
->kcm_rx_waiters
,
293 struct kcm_sock
, wait_rx_list
);
294 list_del(&kcm
->wait_rx_list
);
295 kcm
->rx_wait
= false;
298 kcm
->rx_psock
= psock
;
300 spin_unlock_bh(&mux
->rx_lock
);
305 static void kcm_done(struct kcm_sock
*kcm
);
307 static void kcm_done_work(struct work_struct
*w
)
309 kcm_done(container_of(w
, struct kcm_sock
, done_work
));
312 /* Lower sock held */
313 static void unreserve_rx_kcm(struct kcm_psock
*psock
,
316 struct kcm_sock
*kcm
= psock
->rx_kcm
;
317 struct kcm_mux
*mux
= psock
->mux
;
322 spin_lock_bh(&mux
->rx_lock
);
324 psock
->rx_kcm
= NULL
;
325 kcm
->rx_psock
= NULL
;
327 /* Commit kcm->rx_psock before sk_rmem_alloc_get to sync with
332 if (unlikely(kcm
->done
)) {
333 spin_unlock_bh(&mux
->rx_lock
);
335 /* Need to run kcm_done in a task since we need to qcquire
336 * callback locks which may already be held here.
338 INIT_WORK(&kcm
->done_work
, kcm_done_work
);
339 schedule_work(&kcm
->done_work
);
343 if (unlikely(kcm
->rx_disabled
)) {
344 requeue_rx_msgs(mux
, &kcm
->sk
.sk_receive_queue
);
345 } else if (rcv_ready
|| unlikely(!sk_rmem_alloc_get(&kcm
->sk
))) {
346 /* Check for degenerative race with rx_wait that all
347 * data was dequeued (accounted for in kcm_rfree).
351 spin_unlock_bh(&mux
->rx_lock
);
354 /* Macro to invoke filter function. */
355 #define KCM_RUN_FILTER(prog, ctx) \
356 (*prog->bpf_func)(ctx, prog->insnsi)
358 /* Lower socket lock held */
359 static int kcm_tcp_recv(read_descriptor_t
*desc
, struct sk_buff
*orig_skb
,
360 unsigned int orig_offset
, size_t orig_len
)
362 struct kcm_psock
*psock
= (struct kcm_psock
*)desc
->arg
.data
;
363 struct kcm_rx_msg
*rxm
;
364 struct kcm_sock
*kcm
;
365 struct sk_buff
*head
, *skb
;
366 size_t eaten
= 0, cand_len
;
369 bool cloned_orig
= false;
371 if (psock
->ready_rx_msg
)
374 head
= psock
->rx_skb_head
;
376 /* Message already in progress */
378 rxm
= kcm_rx_msg(head
);
379 if (unlikely(rxm
->early_eaten
)) {
380 /* Already some number of bytes on the receive sock
381 * data saved in rx_skb_head, just indicate they
384 eaten
= orig_len
<= rxm
->early_eaten
?
385 orig_len
: rxm
->early_eaten
;
386 rxm
->early_eaten
-= eaten
;
391 if (unlikely(orig_offset
)) {
392 /* Getting data with a non-zero offset when a message is
393 * in progress is not expected. If it does happen, we
394 * need to clone and pull since we can't deal with
395 * offsets in the skbs for a message expect in the head.
397 orig_skb
= skb_clone(orig_skb
, GFP_ATOMIC
);
399 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
400 desc
->error
= -ENOMEM
;
403 if (!pskb_pull(orig_skb
, orig_offset
)) {
404 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
406 desc
->error
= -ENOMEM
;
413 if (!psock
->rx_skb_nextp
) {
414 /* We are going to append to the frags_list of head.
415 * Need to unshare the frag_list.
417 err
= skb_unclone(head
, GFP_ATOMIC
);
419 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
424 if (unlikely(skb_shinfo(head
)->frag_list
)) {
425 /* We can't append to an sk_buff that already
426 * has a frag_list. We create a new head, point
427 * the frag_list of that to the old head, and
428 * then are able to use the old head->next for
429 * appending to the message.
431 if (WARN_ON(head
->next
)) {
432 desc
->error
= -EINVAL
;
436 skb
= alloc_skb(0, GFP_ATOMIC
);
438 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
439 desc
->error
= -ENOMEM
;
442 skb
->len
= head
->len
;
443 skb
->data_len
= head
->len
;
444 skb
->truesize
= head
->truesize
;
445 *kcm_rx_msg(skb
) = *kcm_rx_msg(head
);
446 psock
->rx_skb_nextp
= &head
->next
;
447 skb_shinfo(skb
)->frag_list
= head
;
448 psock
->rx_skb_head
= skb
;
451 psock
->rx_skb_nextp
=
452 &skb_shinfo(head
)->frag_list
;
457 while (eaten
< orig_len
) {
458 /* Always clone since we will consume something */
459 skb
= skb_clone(orig_skb
, GFP_ATOMIC
);
461 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
462 desc
->error
= -ENOMEM
;
466 cand_len
= orig_len
- eaten
;
468 head
= psock
->rx_skb_head
;
471 psock
->rx_skb_head
= head
;
472 /* Will set rx_skb_nextp on next packet if needed */
473 psock
->rx_skb_nextp
= NULL
;
474 rxm
= kcm_rx_msg(head
);
475 memset(rxm
, 0, sizeof(*rxm
));
476 rxm
->offset
= orig_offset
+ eaten
;
478 /* Unclone since we may be appending to an skb that we
479 * already share a frag_list with.
481 err
= skb_unclone(skb
, GFP_ATOMIC
);
483 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
488 rxm
= kcm_rx_msg(head
);
489 *psock
->rx_skb_nextp
= skb
;
490 psock
->rx_skb_nextp
= &skb
->next
;
491 head
->data_len
+= skb
->len
;
492 head
->len
+= skb
->len
;
493 head
->truesize
+= skb
->truesize
;
496 if (!rxm
->full_len
) {
499 len
= KCM_RUN_FILTER(psock
->bpf_prog
, head
);
502 /* Need more header to determine length */
503 rxm
->accum_len
+= cand_len
;
505 KCM_STATS_INCR(psock
->stats
.rx_need_more_hdr
);
506 WARN_ON(eaten
!= orig_len
);
508 } else if (len
> psock
->sk
->sk_rcvbuf
) {
509 /* Message length exceeds maximum allowed */
510 KCM_STATS_INCR(psock
->stats
.rx_msg_too_big
);
511 desc
->error
= -EMSGSIZE
;
512 psock
->rx_skb_head
= NULL
;
513 kcm_abort_rx_psock(psock
, EMSGSIZE
, head
);
515 } else if (len
<= (ssize_t
)head
->len
-
516 skb
->len
- rxm
->offset
) {
517 /* Length must be into new skb (and also
520 KCM_STATS_INCR(psock
->stats
.rx_bad_hdr_len
);
521 desc
->error
= -EPROTO
;
522 psock
->rx_skb_head
= NULL
;
523 kcm_abort_rx_psock(psock
, EPROTO
, head
);
530 extra
= (ssize_t
)(rxm
->accum_len
+ cand_len
) - rxm
->full_len
;
533 /* Message not complete yet. */
534 if (rxm
->full_len
- rxm
->accum_len
>
535 tcp_inq(psock
->sk
)) {
536 /* Don't have the whole messages in the socket
537 * buffer. Set psock->rx_need_bytes to wait for
538 * the rest of the message. Also, set "early
539 * eaten" since we've already buffered the skb
540 * but don't consume yet per tcp_read_sock.
543 psock
->rx_need_bytes
= rxm
->full_len
-
545 rxm
->accum_len
+= cand_len
;
546 rxm
->early_eaten
= cand_len
;
547 KCM_STATS_ADD(psock
->stats
.rx_bytes
, cand_len
);
548 desc
->count
= 0; /* Stop reading socket */
551 rxm
->accum_len
+= cand_len
;
553 WARN_ON(eaten
!= orig_len
);
557 /* Positive extra indicates ore bytes than needed for the
561 WARN_ON(extra
> cand_len
);
563 eaten
+= (cand_len
- extra
);
565 /* Hurray, we have a new message! */
566 psock
->rx_skb_head
= NULL
;
567 KCM_STATS_INCR(psock
->stats
.rx_msgs
);
570 kcm
= reserve_rx_kcm(psock
, head
);
572 /* Unable to reserve a KCM, message is held in psock. */
576 if (kcm_queue_rcv_skb(&kcm
->sk
, head
)) {
577 /* Should mean socket buffer full */
578 unreserve_rx_kcm(psock
, false);
586 KCM_STATS_ADD(psock
->stats
.rx_bytes
, eaten
);
591 /* Called with lock held on lower socket */
592 static int psock_tcp_read_sock(struct kcm_psock
*psock
)
594 read_descriptor_t desc
;
596 desc
.arg
.data
= psock
;
598 desc
.count
= 1; /* give more than one skb per call */
600 /* sk should be locked here, so okay to do tcp_read_sock */
601 tcp_read_sock(psock
->sk
, &desc
, kcm_tcp_recv
);
603 unreserve_rx_kcm(psock
, true);
608 /* Lower sock lock held */
609 static void psock_tcp_data_ready(struct sock
*sk
)
611 struct kcm_psock
*psock
;
613 read_lock_bh(&sk
->sk_callback_lock
);
615 psock
= (struct kcm_psock
*)sk
->sk_user_data
;
616 if (unlikely(!psock
|| psock
->rx_stopped
))
619 if (psock
->ready_rx_msg
)
622 if (psock
->rx_need_bytes
) {
623 if (tcp_inq(sk
) >= psock
->rx_need_bytes
)
624 psock
->rx_need_bytes
= 0;
629 if (psock_tcp_read_sock(psock
) == -ENOMEM
)
630 queue_delayed_work(kcm_wq
, &psock
->rx_delayed_work
, 0);
633 read_unlock_bh(&sk
->sk_callback_lock
);
636 static void do_psock_rx_work(struct kcm_psock
*psock
)
638 read_descriptor_t rd_desc
;
639 struct sock
*csk
= psock
->sk
;
641 /* We need the read lock to synchronize with psock_tcp_data_ready. We
642 * need the socket lock for calling tcp_read_sock.
645 read_lock_bh(&csk
->sk_callback_lock
);
647 if (unlikely(csk
->sk_user_data
!= psock
))
650 if (unlikely(psock
->rx_stopped
))
653 if (psock
->ready_rx_msg
)
656 rd_desc
.arg
.data
= psock
;
658 if (psock_tcp_read_sock(psock
) == -ENOMEM
)
659 queue_delayed_work(kcm_wq
, &psock
->rx_delayed_work
, 0);
662 read_unlock_bh(&csk
->sk_callback_lock
);
666 static void psock_rx_work(struct work_struct
*w
)
668 do_psock_rx_work(container_of(w
, struct kcm_psock
, rx_work
));
671 static void psock_rx_delayed_work(struct work_struct
*w
)
673 do_psock_rx_work(container_of(w
, struct kcm_psock
,
674 rx_delayed_work
.work
));
677 static void psock_tcp_state_change(struct sock
*sk
)
679 /* TCP only does a POLLIN for a half close. Do a POLLHUP here
680 * since application will normally not poll with POLLIN
681 * on the TCP sockets.
684 report_csk_error(sk
, EPIPE
);
687 static void psock_tcp_write_space(struct sock
*sk
)
689 struct kcm_psock
*psock
;
691 struct kcm_sock
*kcm
;
693 read_lock_bh(&sk
->sk_callback_lock
);
695 psock
= (struct kcm_psock
*)sk
->sk_user_data
;
696 if (unlikely(!psock
))
701 spin_lock_bh(&mux
->lock
);
703 /* Check if the socket is reserved so someone is waiting for sending. */
706 queue_work(kcm_wq
, &kcm
->tx_work
);
708 spin_unlock_bh(&mux
->lock
);
710 read_unlock_bh(&sk
->sk_callback_lock
);
713 static void unreserve_psock(struct kcm_sock
*kcm
);
715 /* kcm sock is locked. */
716 static struct kcm_psock
*reserve_psock(struct kcm_sock
*kcm
)
718 struct kcm_mux
*mux
= kcm
->mux
;
719 struct kcm_psock
*psock
;
721 psock
= kcm
->tx_psock
;
723 smp_rmb(); /* Must read tx_psock before tx_wait */
726 WARN_ON(kcm
->tx_wait
);
727 if (unlikely(psock
->tx_stopped
))
728 unreserve_psock(kcm
);
730 return kcm
->tx_psock
;
733 spin_lock_bh(&mux
->lock
);
735 /* Check again under lock to see if psock was reserved for this
736 * psock via psock_unreserve.
738 psock
= kcm
->tx_psock
;
739 if (unlikely(psock
)) {
740 WARN_ON(kcm
->tx_wait
);
741 spin_unlock_bh(&mux
->lock
);
742 return kcm
->tx_psock
;
745 if (!list_empty(&mux
->psocks_avail
)) {
746 psock
= list_first_entry(&mux
->psocks_avail
,
749 list_del(&psock
->psock_avail_list
);
751 list_del(&kcm
->wait_psock_list
);
752 kcm
->tx_wait
= false;
754 kcm
->tx_psock
= psock
;
756 KCM_STATS_INCR(psock
->stats
.reserved
);
757 } else if (!kcm
->tx_wait
) {
758 list_add_tail(&kcm
->wait_psock_list
,
759 &mux
->kcm_tx_waiters
);
763 spin_unlock_bh(&mux
->lock
);
769 static void psock_now_avail(struct kcm_psock
*psock
)
771 struct kcm_mux
*mux
= psock
->mux
;
772 struct kcm_sock
*kcm
;
774 if (list_empty(&mux
->kcm_tx_waiters
)) {
775 list_add_tail(&psock
->psock_avail_list
,
778 kcm
= list_first_entry(&mux
->kcm_tx_waiters
,
781 list_del(&kcm
->wait_psock_list
);
782 kcm
->tx_wait
= false;
785 /* Commit before changing tx_psock since that is read in
786 * reserve_psock before queuing work.
790 kcm
->tx_psock
= psock
;
791 KCM_STATS_INCR(psock
->stats
.reserved
);
792 queue_work(kcm_wq
, &kcm
->tx_work
);
796 /* kcm sock is locked. */
797 static void unreserve_psock(struct kcm_sock
*kcm
)
799 struct kcm_psock
*psock
;
800 struct kcm_mux
*mux
= kcm
->mux
;
802 spin_lock_bh(&mux
->lock
);
804 psock
= kcm
->tx_psock
;
806 if (WARN_ON(!psock
)) {
807 spin_unlock_bh(&mux
->lock
);
811 smp_rmb(); /* Read tx_psock before tx_wait */
813 kcm_update_tx_mux_stats(mux
, psock
);
815 WARN_ON(kcm
->tx_wait
);
817 kcm
->tx_psock
= NULL
;
818 psock
->tx_kcm
= NULL
;
819 KCM_STATS_INCR(psock
->stats
.unreserved
);
821 if (unlikely(psock
->tx_stopped
)) {
824 list_del(&psock
->psock_list
);
827 fput(psock
->sk
->sk_socket
->file
);
828 kmem_cache_free(kcm_psockp
, psock
);
831 /* Don't put back on available list */
833 spin_unlock_bh(&mux
->lock
);
838 psock_now_avail(psock
);
840 spin_unlock_bh(&mux
->lock
);
843 static void kcm_report_tx_retry(struct kcm_sock
*kcm
)
845 struct kcm_mux
*mux
= kcm
->mux
;
847 spin_lock_bh(&mux
->lock
);
848 KCM_STATS_INCR(mux
->stats
.tx_retries
);
849 spin_unlock_bh(&mux
->lock
);
852 /* Write any messages ready on the kcm socket. Called with kcm sock lock
853 * held. Return bytes actually sent or error.
855 static int kcm_write_msgs(struct kcm_sock
*kcm
)
857 struct sock
*sk
= &kcm
->sk
;
858 struct kcm_psock
*psock
;
859 struct sk_buff
*skb
, *head
;
860 struct kcm_tx_msg
*txm
;
861 unsigned short fragidx
, frag_offset
;
862 unsigned int sent
, total_sent
= 0;
865 kcm
->tx_wait_more
= false;
866 psock
= kcm
->tx_psock
;
867 if (unlikely(psock
&& psock
->tx_stopped
)) {
868 /* A reserved psock was aborted asynchronously. Unreserve
869 * it and we'll retry the message.
871 unreserve_psock(kcm
);
872 kcm_report_tx_retry(kcm
);
873 if (skb_queue_empty(&sk
->sk_write_queue
))
876 kcm_tx_msg(skb_peek(&sk
->sk_write_queue
))->sent
= 0;
878 } else if (skb_queue_empty(&sk
->sk_write_queue
)) {
882 head
= skb_peek(&sk
->sk_write_queue
);
883 txm
= kcm_tx_msg(head
);
886 /* Send of first skbuff in queue already in progress */
887 if (WARN_ON(!psock
)) {
892 frag_offset
= txm
->frag_offset
;
893 fragidx
= txm
->fragidx
;
900 psock
= reserve_psock(kcm
);
906 txm
= kcm_tx_msg(head
);
910 if (WARN_ON(!skb_shinfo(skb
)->nr_frags
)) {
915 for (fragidx
= 0; fragidx
< skb_shinfo(skb
)->nr_frags
;
921 frag
= &skb_shinfo(skb
)->frags
[fragidx
];
922 if (WARN_ON(!frag
->size
)) {
927 ret
= kernel_sendpage(psock
->sk
->sk_socket
,
929 frag
->page_offset
+ frag_offset
,
930 frag
->size
- frag_offset
,
933 if (ret
== -EAGAIN
) {
934 /* Save state to try again when there's
935 * write space on the socket
938 txm
->frag_offset
= frag_offset
;
939 txm
->fragidx
= fragidx
;
946 /* Hard failure in sending message, abort this
947 * psock since it has lost framing
948 * synchonization and retry sending the
949 * message from the beginning.
951 kcm_abort_tx_psock(psock
, ret
? -ret
: EPIPE
,
953 unreserve_psock(kcm
);
956 kcm_report_tx_retry(kcm
);
964 KCM_STATS_ADD(psock
->stats
.tx_bytes
, ret
);
965 if (frag_offset
< frag
->size
) {
966 /* Not finished with this frag */
972 if (skb_has_frag_list(skb
)) {
973 skb
= skb_shinfo(skb
)->frag_list
;
976 } else if (skb
->next
) {
981 /* Successfully sent the whole packet, account for it. */
982 skb_dequeue(&sk
->sk_write_queue
);
984 sk
->sk_wmem_queued
-= sent
;
986 KCM_STATS_INCR(psock
->stats
.tx_msgs
);
987 } while ((head
= skb_peek(&sk
->sk_write_queue
)));
990 /* Done with all queued messages. */
991 WARN_ON(!skb_queue_empty(&sk
->sk_write_queue
));
992 unreserve_psock(kcm
);
995 /* Check if write space is available */
996 sk
->sk_write_space(sk
);
998 return total_sent
? : ret
;
1001 static void kcm_tx_work(struct work_struct
*w
)
1003 struct kcm_sock
*kcm
= container_of(w
, struct kcm_sock
, tx_work
);
1004 struct sock
*sk
= &kcm
->sk
;
1009 /* Primarily for SOCK_DGRAM sockets, also handle asynchronous tx
1012 err
= kcm_write_msgs(kcm
);
1014 /* Hard failure in write, report error on KCM socket */
1015 pr_warn("KCM: Hard failure on kcm_write_msgs %d\n", err
);
1016 report_csk_error(&kcm
->sk
, -err
);
1020 /* Primarily for SOCK_SEQPACKET sockets */
1021 if (likely(sk
->sk_socket
) &&
1022 test_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
)) {
1023 clear_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
1024 sk
->sk_write_space(sk
);
1031 static void kcm_push(struct kcm_sock
*kcm
)
1033 if (kcm
->tx_wait_more
)
1034 kcm_write_msgs(kcm
);
1037 static ssize_t
kcm_sendpage(struct socket
*sock
, struct page
*page
,
1038 int offset
, size_t size
, int flags
)
1041 struct sock
*sk
= sock
->sk
;
1042 struct kcm_sock
*kcm
= kcm_sk(sk
);
1043 struct sk_buff
*skb
= NULL
, *head
= NULL
;
1044 long timeo
= sock_sndtimeo(sk
, flags
& MSG_DONTWAIT
);
1049 if (flags
& MSG_SENDPAGE_NOTLAST
)
1052 /* No MSG_EOR from splice, only look at MSG_MORE */
1053 eor
= !(flags
& MSG_MORE
);
1057 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE
, sk
);
1064 /* Previously opened message */
1065 head
= kcm
->seq_skb
;
1066 skb
= kcm_tx_msg(head
)->last_skb
;
1067 i
= skb_shinfo(skb
)->nr_frags
;
1069 if (skb_can_coalesce(skb
, i
, page
, offset
)) {
1070 skb_frag_size_add(&skb_shinfo(skb
)->frags
[i
- 1], size
);
1071 skb_shinfo(skb
)->tx_flags
|= SKBTX_SHARED_FRAG
;
1075 if (i
>= MAX_SKB_FRAGS
) {
1076 struct sk_buff
*tskb
;
1078 tskb
= alloc_skb(0, sk
->sk_allocation
);
1081 err
= sk_stream_wait_memory(sk
, &timeo
);
1087 skb_shinfo(head
)->frag_list
= tskb
;
1092 skb
->ip_summed
= CHECKSUM_UNNECESSARY
;
1096 /* Call the sk_stream functions to manage the sndbuf mem. */
1097 if (!sk_stream_memory_free(sk
)) {
1099 set_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
1100 err
= sk_stream_wait_memory(sk
, &timeo
);
1105 head
= alloc_skb(0, sk
->sk_allocation
);
1108 err
= sk_stream_wait_memory(sk
, &timeo
);
1118 skb_fill_page_desc(skb
, i
, page
, offset
, size
);
1119 skb_shinfo(skb
)->tx_flags
|= SKBTX_SHARED_FRAG
;
1123 skb
->data_len
+= size
;
1124 skb
->truesize
+= size
;
1125 sk
->sk_wmem_queued
+= size
;
1126 sk_mem_charge(sk
, size
);
1130 head
->data_len
+= size
;
1131 head
->truesize
+= size
;
1135 bool not_busy
= skb_queue_empty(&sk
->sk_write_queue
);
1137 /* Message complete, queue it on send buffer */
1138 __skb_queue_tail(&sk
->sk_write_queue
, head
);
1139 kcm
->seq_skb
= NULL
;
1140 KCM_STATS_INCR(kcm
->stats
.tx_msgs
);
1142 if (flags
& MSG_BATCH
) {
1143 kcm
->tx_wait_more
= true;
1144 } else if (kcm
->tx_wait_more
|| not_busy
) {
1145 err
= kcm_write_msgs(kcm
);
1147 /* We got a hard error in write_msgs but have
1148 * already queued this message. Report an error
1149 * in the socket, but don't affect return value
1152 pr_warn("KCM: Hard failure on kcm_write_msgs\n");
1153 report_csk_error(&kcm
->sk
, -err
);
1157 /* Message not complete, save state */
1158 kcm
->seq_skb
= head
;
1159 kcm_tx_msg(head
)->last_skb
= skb
;
1162 KCM_STATS_ADD(kcm
->stats
.tx_bytes
, size
);
1170 err
= sk_stream_error(sk
, flags
, err
);
1172 /* make sure we wake any epoll edge trigger waiter */
1173 if (unlikely(skb_queue_len(&sk
->sk_write_queue
) == 0 && err
== -EAGAIN
))
1174 sk
->sk_write_space(sk
);
1180 static int kcm_sendmsg(struct socket
*sock
, struct msghdr
*msg
, size_t len
)
1182 struct sock
*sk
= sock
->sk
;
1183 struct kcm_sock
*kcm
= kcm_sk(sk
);
1184 struct sk_buff
*skb
= NULL
, *head
= NULL
;
1185 size_t copy
, copied
= 0;
1186 long timeo
= sock_sndtimeo(sk
, msg
->msg_flags
& MSG_DONTWAIT
);
1187 int eor
= (sock
->type
== SOCK_DGRAM
) ?
1188 !(msg
->msg_flags
& MSG_MORE
) : !!(msg
->msg_flags
& MSG_EOR
);
1193 /* Per tcp_sendmsg this should be in poll */
1194 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE
, sk
);
1200 /* Previously opened message */
1201 head
= kcm
->seq_skb
;
1202 skb
= kcm_tx_msg(head
)->last_skb
;
1206 /* Call the sk_stream functions to manage the sndbuf mem. */
1207 if (!sk_stream_memory_free(sk
)) {
1209 set_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
1210 err
= sk_stream_wait_memory(sk
, &timeo
);
1215 /* New message, alloc head skb */
1216 head
= alloc_skb(0, sk
->sk_allocation
);
1219 err
= sk_stream_wait_memory(sk
, &timeo
);
1223 head
= alloc_skb(0, sk
->sk_allocation
);
1228 /* Set ip_summed to CHECKSUM_UNNECESSARY to avoid calling
1229 * csum_and_copy_from_iter from skb_do_copy_data_nocache.
1231 skb
->ip_summed
= CHECKSUM_UNNECESSARY
;
1234 while (msg_data_left(msg
)) {
1236 int i
= skb_shinfo(skb
)->nr_frags
;
1237 struct page_frag
*pfrag
= sk_page_frag(sk
);
1239 if (!sk_page_frag_refill(sk
, pfrag
))
1240 goto wait_for_memory
;
1242 if (!skb_can_coalesce(skb
, i
, pfrag
->page
,
1244 if (i
== MAX_SKB_FRAGS
) {
1245 struct sk_buff
*tskb
;
1247 tskb
= alloc_skb(0, sk
->sk_allocation
);
1249 goto wait_for_memory
;
1252 skb_shinfo(head
)->frag_list
= tskb
;
1257 skb
->ip_summed
= CHECKSUM_UNNECESSARY
;
1263 copy
= min_t(int, msg_data_left(msg
),
1264 pfrag
->size
- pfrag
->offset
);
1266 if (!sk_wmem_schedule(sk
, copy
))
1267 goto wait_for_memory
;
1269 err
= skb_copy_to_page_nocache(sk
, &msg
->msg_iter
, skb
,
1276 /* Update the skb. */
1278 skb_frag_size_add(&skb_shinfo(skb
)->frags
[i
- 1], copy
);
1280 skb_fill_page_desc(skb
, i
, pfrag
->page
,
1281 pfrag
->offset
, copy
);
1282 get_page(pfrag
->page
);
1285 pfrag
->offset
+= copy
;
1289 head
->data_len
+= copy
;
1296 err
= sk_stream_wait_memory(sk
, &timeo
);
1302 bool not_busy
= skb_queue_empty(&sk
->sk_write_queue
);
1304 /* Message complete, queue it on send buffer */
1305 __skb_queue_tail(&sk
->sk_write_queue
, head
);
1306 kcm
->seq_skb
= NULL
;
1307 KCM_STATS_INCR(kcm
->stats
.tx_msgs
);
1309 if (msg
->msg_flags
& MSG_BATCH
) {
1310 kcm
->tx_wait_more
= true;
1311 } else if (kcm
->tx_wait_more
|| not_busy
) {
1312 err
= kcm_write_msgs(kcm
);
1314 /* We got a hard error in write_msgs but have
1315 * already queued this message. Report an error
1316 * in the socket, but don't affect return value
1319 pr_warn("KCM: Hard failure on kcm_write_msgs\n");
1320 report_csk_error(&kcm
->sk
, -err
);
1324 /* Message not complete, save state */
1326 kcm
->seq_skb
= head
;
1327 kcm_tx_msg(head
)->last_skb
= skb
;
1330 KCM_STATS_ADD(kcm
->stats
.tx_bytes
, copied
);
1338 if (copied
&& sock
->type
== SOCK_SEQPACKET
) {
1339 /* Wrote some bytes before encountering an
1340 * error, return partial success.
1342 goto partial_message
;
1345 if (head
!= kcm
->seq_skb
)
1348 err
= sk_stream_error(sk
, msg
->msg_flags
, err
);
1350 /* make sure we wake any epoll edge trigger waiter */
1351 if (unlikely(skb_queue_len(&sk
->sk_write_queue
) == 0 && err
== -EAGAIN
))
1352 sk
->sk_write_space(sk
);
1358 static struct sk_buff
*kcm_wait_data(struct sock
*sk
, int flags
,
1359 long timeo
, int *err
)
1361 struct sk_buff
*skb
;
1363 while (!(skb
= skb_peek(&sk
->sk_receive_queue
))) {
1365 *err
= sock_error(sk
);
1369 if (sock_flag(sk
, SOCK_DONE
))
1372 if ((flags
& MSG_DONTWAIT
) || !timeo
) {
1377 sk_wait_data(sk
, &timeo
, NULL
);
1379 /* Handle signals */
1380 if (signal_pending(current
)) {
1381 *err
= sock_intr_errno(timeo
);
1389 static int kcm_recvmsg(struct socket
*sock
, struct msghdr
*msg
,
1390 size_t len
, int flags
)
1392 struct sock
*sk
= sock
->sk
;
1393 struct kcm_sock
*kcm
= kcm_sk(sk
);
1396 struct kcm_rx_msg
*rxm
;
1398 struct sk_buff
*skb
;
1400 timeo
= sock_rcvtimeo(sk
, flags
& MSG_DONTWAIT
);
1404 skb
= kcm_wait_data(sk
, flags
, timeo
, &err
);
1408 /* Okay, have a message on the receive queue */
1410 rxm
= kcm_rx_msg(skb
);
1412 if (len
> rxm
->full_len
)
1413 len
= rxm
->full_len
;
1415 err
= skb_copy_datagram_msg(skb
, rxm
->offset
, msg
, len
);
1420 if (likely(!(flags
& MSG_PEEK
))) {
1421 KCM_STATS_ADD(kcm
->stats
.rx_bytes
, copied
);
1422 if (copied
< rxm
->full_len
) {
1423 if (sock
->type
== SOCK_DGRAM
) {
1424 /* Truncated message */
1425 msg
->msg_flags
|= MSG_TRUNC
;
1428 rxm
->offset
+= copied
;
1429 rxm
->full_len
-= copied
;
1432 /* Finished with message */
1433 msg
->msg_flags
|= MSG_EOR
;
1434 KCM_STATS_INCR(kcm
->stats
.rx_msgs
);
1435 skb_unlink(skb
, &sk
->sk_receive_queue
);
1443 return copied
? : err
;
1446 static ssize_t
kcm_sock_splice(struct sock
*sk
,
1447 struct pipe_inode_info
*pipe
,
1448 struct splice_pipe_desc
*spd
)
1453 ret
= splice_to_pipe(pipe
, spd
);
1459 static ssize_t
kcm_splice_read(struct socket
*sock
, loff_t
*ppos
,
1460 struct pipe_inode_info
*pipe
, size_t len
,
1463 struct sock
*sk
= sock
->sk
;
1464 struct kcm_sock
*kcm
= kcm_sk(sk
);
1466 struct kcm_rx_msg
*rxm
;
1469 struct sk_buff
*skb
;
1471 /* Only support splice for SOCKSEQPACKET */
1473 timeo
= sock_rcvtimeo(sk
, flags
& MSG_DONTWAIT
);
1477 skb
= kcm_wait_data(sk
, flags
, timeo
, &err
);
1481 /* Okay, have a message on the receive queue */
1483 rxm
= kcm_rx_msg(skb
);
1485 if (len
> rxm
->full_len
)
1486 len
= rxm
->full_len
;
1488 copied
= skb_splice_bits(skb
, sk
, rxm
->offset
, pipe
, len
, flags
,
1495 KCM_STATS_ADD(kcm
->stats
.rx_bytes
, copied
);
1497 rxm
->offset
+= copied
;
1498 rxm
->full_len
-= copied
;
1500 /* We have no way to return MSG_EOR. If all the bytes have been
1501 * read we still leave the message in the receive socket buffer.
1502 * A subsequent recvmsg needs to be done to return MSG_EOR and
1503 * finish reading the message.
1516 /* kcm sock lock held */
1517 static void kcm_recv_disable(struct kcm_sock
*kcm
)
1519 struct kcm_mux
*mux
= kcm
->mux
;
1521 if (kcm
->rx_disabled
)
1524 spin_lock_bh(&mux
->rx_lock
);
1526 kcm
->rx_disabled
= 1;
1528 /* If a psock is reserved we'll do cleanup in unreserve */
1529 if (!kcm
->rx_psock
) {
1531 list_del(&kcm
->wait_rx_list
);
1532 kcm
->rx_wait
= false;
1535 requeue_rx_msgs(mux
, &kcm
->sk
.sk_receive_queue
);
1538 spin_unlock_bh(&mux
->rx_lock
);
1541 /* kcm sock lock held */
1542 static void kcm_recv_enable(struct kcm_sock
*kcm
)
1544 struct kcm_mux
*mux
= kcm
->mux
;
1546 if (!kcm
->rx_disabled
)
1549 spin_lock_bh(&mux
->rx_lock
);
1551 kcm
->rx_disabled
= 0;
1554 spin_unlock_bh(&mux
->rx_lock
);
1557 static int kcm_setsockopt(struct socket
*sock
, int level
, int optname
,
1558 char __user
*optval
, unsigned int optlen
)
1560 struct kcm_sock
*kcm
= kcm_sk(sock
->sk
);
1564 if (level
!= SOL_KCM
)
1565 return -ENOPROTOOPT
;
1567 if (optlen
< sizeof(int))
1570 if (get_user(val
, (int __user
*)optval
))
1573 valbool
= val
? 1 : 0;
1576 case KCM_RECV_DISABLE
:
1577 lock_sock(&kcm
->sk
);
1579 kcm_recv_disable(kcm
);
1581 kcm_recv_enable(kcm
);
1582 release_sock(&kcm
->sk
);
1591 static int kcm_getsockopt(struct socket
*sock
, int level
, int optname
,
1592 char __user
*optval
, int __user
*optlen
)
1594 struct kcm_sock
*kcm
= kcm_sk(sock
->sk
);
1597 if (level
!= SOL_KCM
)
1598 return -ENOPROTOOPT
;
1600 if (get_user(len
, optlen
))
1603 len
= min_t(unsigned int, len
, sizeof(int));
1608 case KCM_RECV_DISABLE
:
1609 val
= kcm
->rx_disabled
;
1612 return -ENOPROTOOPT
;
1615 if (put_user(len
, optlen
))
1617 if (copy_to_user(optval
, &val
, len
))
1622 static void init_kcm_sock(struct kcm_sock
*kcm
, struct kcm_mux
*mux
)
1624 struct kcm_sock
*tkcm
;
1625 struct list_head
*head
;
1628 /* For SOCK_SEQPACKET sock type, datagram_poll checks the sk_state, so
1629 * we set sk_state, otherwise epoll_wait always returns right away with
1632 kcm
->sk
.sk_state
= TCP_ESTABLISHED
;
1634 /* Add to mux's kcm sockets list */
1636 spin_lock_bh(&mux
->lock
);
1638 head
= &mux
->kcm_socks
;
1639 list_for_each_entry(tkcm
, &mux
->kcm_socks
, kcm_sock_list
) {
1640 if (tkcm
->index
!= index
)
1642 head
= &tkcm
->kcm_sock_list
;
1646 list_add(&kcm
->kcm_sock_list
, head
);
1649 mux
->kcm_socks_cnt
++;
1650 spin_unlock_bh(&mux
->lock
);
1652 INIT_WORK(&kcm
->tx_work
, kcm_tx_work
);
1654 spin_lock_bh(&mux
->rx_lock
);
1656 spin_unlock_bh(&mux
->rx_lock
);
1659 static int kcm_attach(struct socket
*sock
, struct socket
*csock
,
1660 struct bpf_prog
*prog
)
1662 struct kcm_sock
*kcm
= kcm_sk(sock
->sk
);
1663 struct kcm_mux
*mux
= kcm
->mux
;
1665 struct kcm_psock
*psock
= NULL
, *tpsock
;
1666 struct list_head
*head
;
1669 if (csock
->ops
->family
!= PF_INET
&&
1670 csock
->ops
->family
!= PF_INET6
)
1677 /* Only support TCP for now */
1678 if (csk
->sk_protocol
!= IPPROTO_TCP
)
1681 psock
= kmem_cache_zalloc(kcm_psockp
, GFP_KERNEL
);
1687 psock
->bpf_prog
= prog
;
1688 INIT_WORK(&psock
->rx_work
, psock_rx_work
);
1689 INIT_DELAYED_WORK(&psock
->rx_delayed_work
, psock_rx_delayed_work
);
1693 write_lock_bh(&csk
->sk_callback_lock
);
1694 psock
->save_data_ready
= csk
->sk_data_ready
;
1695 psock
->save_write_space
= csk
->sk_write_space
;
1696 psock
->save_state_change
= csk
->sk_state_change
;
1697 csk
->sk_user_data
= psock
;
1698 csk
->sk_data_ready
= psock_tcp_data_ready
;
1699 csk
->sk_write_space
= psock_tcp_write_space
;
1700 csk
->sk_state_change
= psock_tcp_state_change
;
1701 write_unlock_bh(&csk
->sk_callback_lock
);
1703 /* Finished initialization, now add the psock to the MUX. */
1704 spin_lock_bh(&mux
->lock
);
1705 head
= &mux
->psocks
;
1706 list_for_each_entry(tpsock
, &mux
->psocks
, psock_list
) {
1707 if (tpsock
->index
!= index
)
1709 head
= &tpsock
->psock_list
;
1713 list_add(&psock
->psock_list
, head
);
1714 psock
->index
= index
;
1716 KCM_STATS_INCR(mux
->stats
.psock_attach
);
1718 psock_now_avail(psock
);
1719 spin_unlock_bh(&mux
->lock
);
1721 /* Schedule RX work in case there are already bytes queued */
1722 queue_work(kcm_wq
, &psock
->rx_work
);
1727 static int kcm_attach_ioctl(struct socket
*sock
, struct kcm_attach
*info
)
1729 struct socket
*csock
;
1730 struct bpf_prog
*prog
;
1733 csock
= sockfd_lookup(info
->fd
, &err
);
1737 prog
= bpf_prog_get(info
->bpf_fd
);
1739 err
= PTR_ERR(prog
);
1743 if (prog
->type
!= BPF_PROG_TYPE_SOCKET_FILTER
) {
1749 err
= kcm_attach(sock
, csock
, prog
);
1755 /* Keep reference on file also */
1763 static void kcm_unattach(struct kcm_psock
*psock
)
1765 struct sock
*csk
= psock
->sk
;
1766 struct kcm_mux
*mux
= psock
->mux
;
1768 /* Stop getting callbacks from TCP socket. After this there should
1769 * be no way to reserve a kcm for this psock.
1771 write_lock_bh(&csk
->sk_callback_lock
);
1772 csk
->sk_user_data
= NULL
;
1773 csk
->sk_data_ready
= psock
->save_data_ready
;
1774 csk
->sk_write_space
= psock
->save_write_space
;
1775 csk
->sk_state_change
= psock
->save_state_change
;
1776 psock
->rx_stopped
= 1;
1778 if (WARN_ON(psock
->rx_kcm
)) {
1779 write_unlock_bh(&csk
->sk_callback_lock
);
1783 spin_lock_bh(&mux
->rx_lock
);
1785 /* Stop receiver activities. After this point psock should not be
1786 * able to get onto ready list either through callbacks or work.
1788 if (psock
->ready_rx_msg
) {
1789 list_del(&psock
->psock_ready_list
);
1790 kfree_skb(psock
->ready_rx_msg
);
1791 psock
->ready_rx_msg
= NULL
;
1792 KCM_STATS_INCR(mux
->stats
.rx_ready_drops
);
1795 spin_unlock_bh(&mux
->rx_lock
);
1797 write_unlock_bh(&csk
->sk_callback_lock
);
1799 cancel_work_sync(&psock
->rx_work
);
1800 cancel_delayed_work_sync(&psock
->rx_delayed_work
);
1802 bpf_prog_put(psock
->bpf_prog
);
1804 kfree_skb(psock
->rx_skb_head
);
1805 psock
->rx_skb_head
= NULL
;
1807 spin_lock_bh(&mux
->lock
);
1809 aggregate_psock_stats(&psock
->stats
, &mux
->aggregate_psock_stats
);
1811 KCM_STATS_INCR(mux
->stats
.psock_unattach
);
1813 if (psock
->tx_kcm
) {
1814 /* psock was reserved. Just mark it finished and we will clean
1815 * up in the kcm paths, we need kcm lock which can not be
1818 KCM_STATS_INCR(mux
->stats
.psock_unattach_rsvd
);
1819 spin_unlock_bh(&mux
->lock
);
1821 /* We are unattaching a socket that is reserved. Abort the
1822 * socket since we may be out of sync in sending on it. We need
1823 * to do this without the mux lock.
1825 kcm_abort_tx_psock(psock
, EPIPE
, false);
1827 spin_lock_bh(&mux
->lock
);
1828 if (!psock
->tx_kcm
) {
1829 /* psock now unreserved in window mux was unlocked */
1834 /* Commit done before queuing work to process it */
1837 /* Queue tx work to make sure psock->done is handled */
1838 queue_work(kcm_wq
, &psock
->tx_kcm
->tx_work
);
1839 spin_unlock_bh(&mux
->lock
);
1842 if (!psock
->tx_stopped
)
1843 list_del(&psock
->psock_avail_list
);
1844 list_del(&psock
->psock_list
);
1846 spin_unlock_bh(&mux
->lock
);
1849 fput(csk
->sk_socket
->file
);
1850 kmem_cache_free(kcm_psockp
, psock
);
1854 static int kcm_unattach_ioctl(struct socket
*sock
, struct kcm_unattach
*info
)
1856 struct kcm_sock
*kcm
= kcm_sk(sock
->sk
);
1857 struct kcm_mux
*mux
= kcm
->mux
;
1858 struct kcm_psock
*psock
;
1859 struct socket
*csock
;
1863 csock
= sockfd_lookup(info
->fd
, &err
);
1875 spin_lock_bh(&mux
->lock
);
1877 list_for_each_entry(psock
, &mux
->psocks
, psock_list
) {
1878 if (psock
->sk
!= csk
)
1881 /* Found the matching psock */
1883 if (psock
->unattaching
|| WARN_ON(psock
->done
)) {
1888 psock
->unattaching
= 1;
1890 spin_unlock_bh(&mux
->lock
);
1892 kcm_unattach(psock
);
1898 spin_unlock_bh(&mux
->lock
);
1905 static struct proto kcm_proto
= {
1907 .owner
= THIS_MODULE
,
1908 .obj_size
= sizeof(struct kcm_sock
),
1911 /* Clone a kcm socket. */
1912 static int kcm_clone(struct socket
*osock
, struct kcm_clone
*info
,
1913 struct socket
**newsockp
)
1915 struct socket
*newsock
;
1917 struct file
*newfile
;
1921 newsock
= sock_alloc();
1925 newsock
->type
= osock
->type
;
1926 newsock
->ops
= osock
->ops
;
1928 __module_get(newsock
->ops
->owner
);
1930 newfd
= get_unused_fd_flags(0);
1931 if (unlikely(newfd
< 0)) {
1936 newfile
= sock_alloc_file(newsock
, 0, osock
->sk
->sk_prot_creator
->name
);
1937 if (unlikely(IS_ERR(newfile
))) {
1938 err
= PTR_ERR(newfile
);
1939 goto out_sock_alloc_fail
;
1942 newsk
= sk_alloc(sock_net(osock
->sk
), PF_KCM
, GFP_KERNEL
,
1946 goto out_sk_alloc_fail
;
1949 sock_init_data(newsock
, newsk
);
1950 init_kcm_sock(kcm_sk(newsk
), kcm_sk(osock
->sk
)->mux
);
1952 fd_install(newfd
, newfile
);
1953 *newsockp
= newsock
;
1960 out_sock_alloc_fail
:
1961 put_unused_fd(newfd
);
1963 sock_release(newsock
);
1968 static int kcm_ioctl(struct socket
*sock
, unsigned int cmd
, unsigned long arg
)
1973 case SIOCKCMATTACH
: {
1974 struct kcm_attach info
;
1976 if (copy_from_user(&info
, (void __user
*)arg
, sizeof(info
)))
1979 err
= kcm_attach_ioctl(sock
, &info
);
1983 case SIOCKCMUNATTACH
: {
1984 struct kcm_unattach info
;
1986 if (copy_from_user(&info
, (void __user
*)arg
, sizeof(info
)))
1989 err
= kcm_unattach_ioctl(sock
, &info
);
1993 case SIOCKCMCLONE
: {
1994 struct kcm_clone info
;
1995 struct socket
*newsock
= NULL
;
1997 if (copy_from_user(&info
, (void __user
*)arg
, sizeof(info
)))
2000 err
= kcm_clone(sock
, &info
, &newsock
);
2003 if (copy_to_user((void __user
*)arg
, &info
,
2006 sock_release(newsock
);
2020 static void free_mux(struct rcu_head
*rcu
)
2022 struct kcm_mux
*mux
= container_of(rcu
,
2023 struct kcm_mux
, rcu
);
2025 kmem_cache_free(kcm_muxp
, mux
);
2028 static void release_mux(struct kcm_mux
*mux
)
2030 struct kcm_net
*knet
= mux
->knet
;
2031 struct kcm_psock
*psock
, *tmp_psock
;
2033 /* Release psocks */
2034 list_for_each_entry_safe(psock
, tmp_psock
,
2035 &mux
->psocks
, psock_list
) {
2036 if (!WARN_ON(psock
->unattaching
))
2037 kcm_unattach(psock
);
2040 if (WARN_ON(mux
->psocks_cnt
))
2043 __skb_queue_purge(&mux
->rx_hold_queue
);
2045 mutex_lock(&knet
->mutex
);
2046 aggregate_mux_stats(&mux
->stats
, &knet
->aggregate_mux_stats
);
2047 aggregate_psock_stats(&mux
->aggregate_psock_stats
,
2048 &knet
->aggregate_psock_stats
);
2049 list_del_rcu(&mux
->kcm_mux_list
);
2051 mutex_unlock(&knet
->mutex
);
2053 call_rcu(&mux
->rcu
, free_mux
);
2056 static void kcm_done(struct kcm_sock
*kcm
)
2058 struct kcm_mux
*mux
= kcm
->mux
;
2059 struct sock
*sk
= &kcm
->sk
;
2062 spin_lock_bh(&mux
->rx_lock
);
2063 if (kcm
->rx_psock
) {
2064 /* Cleanup in unreserve_rx_kcm */
2066 kcm
->rx_disabled
= 1;
2068 spin_unlock_bh(&mux
->rx_lock
);
2073 list_del(&kcm
->wait_rx_list
);
2074 kcm
->rx_wait
= false;
2076 /* Move any pending receive messages to other kcm sockets */
2077 requeue_rx_msgs(mux
, &sk
->sk_receive_queue
);
2079 spin_unlock_bh(&mux
->rx_lock
);
2081 if (WARN_ON(sk_rmem_alloc_get(sk
)))
2084 /* Detach from MUX */
2085 spin_lock_bh(&mux
->lock
);
2087 list_del(&kcm
->kcm_sock_list
);
2088 mux
->kcm_socks_cnt
--;
2089 socks_cnt
= mux
->kcm_socks_cnt
;
2091 spin_unlock_bh(&mux
->lock
);
2094 /* We are done with the mux now. */
2098 WARN_ON(kcm
->rx_wait
);
2103 /* Called by kcm_release to close a KCM socket.
2104 * If this is the last KCM socket on the MUX, destroy the MUX.
2106 static int kcm_release(struct socket
*sock
)
2108 struct sock
*sk
= sock
->sk
;
2109 struct kcm_sock
*kcm
;
2110 struct kcm_mux
*mux
;
2111 struct kcm_psock
*psock
;
2120 kfree_skb(kcm
->seq_skb
);
2123 /* Purge queue under lock to avoid race condition with tx_work trying
2124 * to act when queue is nonempty. If tx_work runs after this point
2125 * it will just return.
2127 __skb_queue_purge(&sk
->sk_write_queue
);
2130 spin_lock_bh(&mux
->lock
);
2132 /* Take of tx_wait list, after this point there should be no way
2133 * that a psock will be assigned to this kcm.
2135 list_del(&kcm
->wait_psock_list
);
2136 kcm
->tx_wait
= false;
2138 spin_unlock_bh(&mux
->lock
);
2140 /* Cancel work. After this point there should be no outside references
2141 * to the kcm socket.
2143 cancel_work_sync(&kcm
->tx_work
);
2146 psock
= kcm
->tx_psock
;
2148 /* A psock was reserved, so we need to kill it since it
2149 * may already have some bytes queued from a message. We
2150 * need to do this after removing kcm from tx_wait list.
2152 kcm_abort_tx_psock(psock
, EPIPE
, false);
2153 unreserve_psock(kcm
);
2157 WARN_ON(kcm
->tx_wait
);
2158 WARN_ON(kcm
->tx_psock
);
2167 static const struct proto_ops kcm_dgram_ops
= {
2169 .owner
= THIS_MODULE
,
2170 .release
= kcm_release
,
2171 .bind
= sock_no_bind
,
2172 .connect
= sock_no_connect
,
2173 .socketpair
= sock_no_socketpair
,
2174 .accept
= sock_no_accept
,
2175 .getname
= sock_no_getname
,
2176 .poll
= datagram_poll
,
2178 .listen
= sock_no_listen
,
2179 .shutdown
= sock_no_shutdown
,
2180 .setsockopt
= kcm_setsockopt
,
2181 .getsockopt
= kcm_getsockopt
,
2182 .sendmsg
= kcm_sendmsg
,
2183 .recvmsg
= kcm_recvmsg
,
2184 .mmap
= sock_no_mmap
,
2185 .sendpage
= kcm_sendpage
,
2188 static const struct proto_ops kcm_seqpacket_ops
= {
2190 .owner
= THIS_MODULE
,
2191 .release
= kcm_release
,
2192 .bind
= sock_no_bind
,
2193 .connect
= sock_no_connect
,
2194 .socketpair
= sock_no_socketpair
,
2195 .accept
= sock_no_accept
,
2196 .getname
= sock_no_getname
,
2197 .poll
= datagram_poll
,
2199 .listen
= sock_no_listen
,
2200 .shutdown
= sock_no_shutdown
,
2201 .setsockopt
= kcm_setsockopt
,
2202 .getsockopt
= kcm_getsockopt
,
2203 .sendmsg
= kcm_sendmsg
,
2204 .recvmsg
= kcm_recvmsg
,
2205 .mmap
= sock_no_mmap
,
2206 .sendpage
= kcm_sendpage
,
2207 .splice_read
= kcm_splice_read
,
2210 /* Create proto operation for kcm sockets */
2211 static int kcm_create(struct net
*net
, struct socket
*sock
,
2212 int protocol
, int kern
)
2214 struct kcm_net
*knet
= net_generic(net
, kcm_net_id
);
2216 struct kcm_mux
*mux
;
2218 switch (sock
->type
) {
2220 sock
->ops
= &kcm_dgram_ops
;
2222 case SOCK_SEQPACKET
:
2223 sock
->ops
= &kcm_seqpacket_ops
;
2226 return -ESOCKTNOSUPPORT
;
2229 if (protocol
!= KCMPROTO_CONNECTED
)
2230 return -EPROTONOSUPPORT
;
2232 sk
= sk_alloc(net
, PF_KCM
, GFP_KERNEL
, &kcm_proto
, kern
);
2236 /* Allocate a kcm mux, shared between KCM sockets */
2237 mux
= kmem_cache_zalloc(kcm_muxp
, GFP_KERNEL
);
2243 spin_lock_init(&mux
->lock
);
2244 spin_lock_init(&mux
->rx_lock
);
2245 INIT_LIST_HEAD(&mux
->kcm_socks
);
2246 INIT_LIST_HEAD(&mux
->kcm_rx_waiters
);
2247 INIT_LIST_HEAD(&mux
->kcm_tx_waiters
);
2249 INIT_LIST_HEAD(&mux
->psocks
);
2250 INIT_LIST_HEAD(&mux
->psocks_ready
);
2251 INIT_LIST_HEAD(&mux
->psocks_avail
);
2255 /* Add new MUX to list */
2256 mutex_lock(&knet
->mutex
);
2257 list_add_rcu(&mux
->kcm_mux_list
, &knet
->mux_list
);
2259 mutex_unlock(&knet
->mutex
);
2261 skb_queue_head_init(&mux
->rx_hold_queue
);
2263 /* Init KCM socket */
2264 sock_init_data(sock
, sk
);
2265 init_kcm_sock(kcm_sk(sk
), mux
);
2270 static struct net_proto_family kcm_family_ops
= {
2272 .create
= kcm_create
,
2273 .owner
= THIS_MODULE
,
2276 static __net_init
int kcm_init_net(struct net
*net
)
2278 struct kcm_net
*knet
= net_generic(net
, kcm_net_id
);
2280 INIT_LIST_HEAD_RCU(&knet
->mux_list
);
2281 mutex_init(&knet
->mutex
);
2286 static __net_exit
void kcm_exit_net(struct net
*net
)
2288 struct kcm_net
*knet
= net_generic(net
, kcm_net_id
);
2290 /* All KCM sockets should be closed at this point, which should mean
2291 * that all multiplexors and psocks have been destroyed.
2293 WARN_ON(!list_empty(&knet
->mux_list
));
2296 static struct pernet_operations kcm_net_ops
= {
2297 .init
= kcm_init_net
,
2298 .exit
= kcm_exit_net
,
2300 .size
= sizeof(struct kcm_net
),
2303 static int __init
kcm_init(void)
2307 kcm_muxp
= kmem_cache_create("kcm_mux_cache",
2308 sizeof(struct kcm_mux
), 0,
2309 SLAB_HWCACHE_ALIGN
| SLAB_PANIC
, NULL
);
2313 kcm_psockp
= kmem_cache_create("kcm_psock_cache",
2314 sizeof(struct kcm_psock
), 0,
2315 SLAB_HWCACHE_ALIGN
| SLAB_PANIC
, NULL
);
2319 kcm_wq
= create_singlethread_workqueue("kkcmd");
2323 err
= proto_register(&kcm_proto
, 1);
2327 err
= sock_register(&kcm_family_ops
);
2329 goto sock_register_fail
;
2331 err
= register_pernet_device(&kcm_net_ops
);
2335 err
= kcm_proc_init();
2337 goto proc_init_fail
;
2342 unregister_pernet_device(&kcm_net_ops
);
2345 sock_unregister(PF_KCM
);
2348 proto_unregister(&kcm_proto
);
2351 kmem_cache_destroy(kcm_muxp
);
2352 kmem_cache_destroy(kcm_psockp
);
2355 destroy_workqueue(kcm_wq
);
2360 static void __exit
kcm_exit(void)
2363 unregister_pernet_device(&kcm_net_ops
);
2364 sock_unregister(PF_KCM
);
2365 proto_unregister(&kcm_proto
);
2366 destroy_workqueue(kcm_wq
);
2368 kmem_cache_destroy(kcm_muxp
);
2369 kmem_cache_destroy(kcm_psockp
);
2372 module_init(kcm_init
);
2373 module_exit(kcm_exit
);
2375 MODULE_LICENSE("GPL");
2376 MODULE_ALIAS_NETPROTO(PF_KCM
);