staging: lustre: Dynamic LNet Configuration (DLC) dynamic routing
[deliverable/linux.git] / drivers / staging / lustre / lnet / lnet / lib-move.c
index cc8c2c5583896a240c194ad7699a0d0cdee03cad..f2b1116e9dd9bfdbe0a93a201ed99e81c733cd32 100644 (file)
@@ -945,9 +945,6 @@ lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
        rbp = lnet_msg2bufpool(msg);
 
        if (!msg->msg_rtrcredit) {
-               LASSERT((rbp->rbp_credits < 0) ==
-                        !list_empty(&rbp->rbp_msgs));
-
                msg->msg_rtrcredit = 1;
                rbp->rbp_credits--;
                if (rbp->rbp_credits < rbp->rbp_mincredits)
@@ -1038,6 +1035,43 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
        }
 }
 
+void
+lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
+{
+       lnet_msg_t *msg;
+
+       if (list_empty(&rbp->rbp_msgs))
+               return;
+       msg = list_entry(rbp->rbp_msgs.next,
+                        lnet_msg_t, msg_list);
+       list_del(&msg->msg_list);
+
+       (void)lnet_post_routed_recv_locked(msg, 1);
+}
+
+void
+lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
+{
+       struct list_head drop;
+       lnet_msg_t *msg;
+       lnet_msg_t *tmp;
+
+       INIT_LIST_HEAD(&drop);
+
+       list_splice_init(list, &drop);
+
+       lnet_net_unlock(cpt);
+
+       list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
+               lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
+                            0, 0, 0, msg->msg_hdr.payload_length);
+               list_del_init(&msg->msg_list);
+               lnet_finalize(NULL, msg, -ECANCELED);
+       }
+
+       lnet_net_lock(cpt);
+}
+
 void
 lnet_return_rx_credits_locked(lnet_msg_t *msg)
 {
@@ -1058,27 +1092,41 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
 
                rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
                rbp = rb->rb_pool;
-               LASSERT(rbp == lnet_msg2bufpool(msg));
 
                msg->msg_kiov = NULL;
                msg->msg_rtrcredit = 0;
 
-               LASSERT((rbp->rbp_credits < 0) ==
-                       !list_empty(&rbp->rbp_msgs));
+               LASSERT(rbp == lnet_msg2bufpool(msg));
+
                LASSERT((rbp->rbp_credits > 0) ==
                        !list_empty(&rbp->rbp_bufs));
 
-               list_add(&rb->rb_list, &rbp->rbp_bufs);
-               rbp->rbp_credits++;
-               if (rbp->rbp_credits <= 0) {
-                       msg2 = list_entry(rbp->rbp_msgs.next,
-                                         lnet_msg_t, msg_list);
-                       list_del(&msg2->msg_list);
+               /*
+                * If routing is now turned off, we just drop this buffer and
+                * don't bother trying to return credits.
+                */
+               if (!the_lnet.ln_routing) {
+                       lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
+                       goto routing_off;
+               }
 
-                       (void) lnet_post_routed_recv_locked(msg2, 1);
+               /*
+                * It is possible that a user has lowered the desired number of
+                * buffers in this pool.  Make sure we never put back
+                * more buffers than the stated number.
+                */
+               if (rbp->rbp_credits >= rbp->rbp_nbuffers) {
+                       /* Discard this buffer so we don't have too many. */
+                       lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
+               } else {
+                       list_add(&rb->rb_list, &rbp->rbp_bufs);
+                       rbp->rbp_credits++;
+                       if (rbp->rbp_credits <= 0)
+                               lnet_schedule_blocked_locked(rbp);
                }
        }
 
+routing_off:
        if (msg->msg_peerrtrcredit) {
                /* give back peer router credits */
                msg->msg_peerrtrcredit = 0;
@@ -1087,7 +1135,14 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
                        !list_empty(&rxpeer->lp_rtrq));
 
                rxpeer->lp_rtrcredits++;
-               if (rxpeer->lp_rtrcredits <= 0) {
+               /*
+                * drop all messages which are queued to be routed on that
+                * peer.
+                */
+               if (!the_lnet.ln_routing) {
+                       lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
+                                                    msg->msg_rx_cpt);
+               } else if (rxpeer->lp_rtrcredits <= 0) {
                        msg2 = list_entry(rxpeer->lp_rtrq.next,
                                          lnet_msg_t, msg_list);
                        list_del(&msg2->msg_list);
@@ -1646,6 +1701,9 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
 {
        int rc = 0;
 
+       if (!the_lnet.ln_routing)
+               return -ECANCELED;
+
        if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
            lnet_msg2bufpool(msg)->rbp_credits <= 0) {
                if (!ni->ni_lnd->lnd_eager_recv) {
@@ -1799,9 +1857,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
 
        if (the_lnet.ln_routing &&
            ni->ni_last_alive != ktime_get_real_seconds()) {
-               lnet_ni_lock(ni);
-
                /* NB: so far here is the only place to set NI status to "up */
+               lnet_ni_lock(ni);
                ni->ni_last_alive = ktime_get_real_seconds();
                if (ni->ni_status &&
                    ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
This page took 0.02738 seconds and 5 git commands to generate.