staging: lustre: Dynamic LNet Configuration (DLC) dynamic routing
[deliverable/linux.git] / drivers / staging / lustre / lnet / lnet / router.c
index 67566ca2db02b1d41976f9377b21ebfc3dc8c330..e3a661129163728e1240f2716137f3ae14b841b0 100644 (file)
 #define LNET_NRB_TINY          (LNET_NRB_TINY_MIN * 4)
 #define LNET_NRB_SMALL_MIN     4096    /* min value for each CPT */
 #define LNET_NRB_SMALL         (LNET_NRB_SMALL_MIN * 4)
+#define LNET_NRB_SMALL_PAGES   1
 #define LNET_NRB_LARGE_MIN     256     /* min value for each CPT */
 #define LNET_NRB_LARGE         (LNET_NRB_LARGE_MIN * 4)
+#define LNET_NRB_LARGE_PAGES   ((LNET_MTU + PAGE_CACHE_SIZE - 1) >> \
+                                PAGE_CACHE_SHIFT)
 
 static char *forwarding = "";
 module_param(forwarding, charp, 0444);
@@ -570,7 +573,8 @@ lnet_get_route(int idx, __u32 *net, __u32 *hops,
                                        *hops     = route->lr_hops;
                                        *priority = route->lr_priority;
                                        *gateway  = route->lr_gateway->lp_nid;
-                                       *alive    = route->lr_gateway->lp_alive;
+                                       *alive = route->lr_gateway->lp_alive &&
+                                                !route->lr_downis;
                                        lnet_net_unlock(cpt);
                                        return 0;
                                }
@@ -608,7 +612,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
 {
        lnet_ping_info_t *info = rcd->rcd_pinginfo;
        struct lnet_peer *gw = rcd->rcd_gateway;
-       lnet_route_t *rtr;
+       lnet_route_t *rte;
 
        if (!gw->lp_alive)
                return;
@@ -634,11 +638,16 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
        if (!(gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS))
                return; /* can't carry NI status info */
 
-       list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) {
+       list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
                int down = 0;
                int up = 0;
                int i;
 
+               if (gw->lp_ping_feats & LNET_PING_FEAT_RTE_DISABLED) {
+                       rte->lr_downis = 1;
+                       continue;
+               }
+
                for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
                        lnet_ni_status_t *stat = &info->pi_ni[i];
                        lnet_nid_t nid = stat->ns_nid;
@@ -659,7 +668,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                        }
 
                        if (stat->ns_status == LNET_NI_STATUS_UP) {
-                               if (LNET_NIDNET(nid) == rtr->lr_net) {
+                               if (LNET_NIDNET(nid) == rte->lr_net) {
                                        up = 1;
                                        break;
                                }
@@ -673,10 +682,10 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                }
 
                if (up) { /* ignore downed NIs if NI for dest network is up */
-                       rtr->lr_downis = 0;
+                       rte->lr_downis = 0;
                        continue;
                }
-               rtr->lr_downis = down;
+               rte->lr_downis = down;
        }
 }
 
@@ -1226,7 +1235,7 @@ rescan:
        return 0;
 }
 
-static void
+void
 lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
 {
        int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
@@ -1273,67 +1282,103 @@ lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt)
 }
 
 static void
-lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
+lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp, int cpt)
 {
        int npages = rbp->rbp_npages;
-       int nbuffers = 0;
+       struct list_head tmp;
        lnet_rtrbuf_t *rb;
 
        if (!rbp->rbp_nbuffers) /* not initialized or already freed */
                return;
 
-       LASSERT(list_empty(&rbp->rbp_msgs));
-       LASSERT(rbp->rbp_credits == rbp->rbp_nbuffers);
+       INIT_LIST_HEAD(&tmp);
 
-       while (!list_empty(&rbp->rbp_bufs)) {
-               LASSERT(rbp->rbp_credits > 0);
+       lnet_net_lock(cpt);
+       lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
+       list_splice_init(&rbp->rbp_bufs, &tmp);
+       rbp->rbp_nbuffers = 0;
+       rbp->rbp_credits = 0;
+       rbp->rbp_mincredits = 0;
+       lnet_net_unlock(cpt);
 
-               rb = list_entry(rbp->rbp_bufs.next,
-                               lnet_rtrbuf_t, rb_list);
+       /* Free buffers on the free list. */
+       while (!list_empty(&tmp)) {
+               rb = list_entry(tmp.next, lnet_rtrbuf_t, rb_list);
                list_del(&rb->rb_list);
                lnet_destroy_rtrbuf(rb, npages);
-               nbuffers++;
        }
-
-       LASSERT(rbp->rbp_nbuffers == nbuffers);
-       LASSERT(rbp->rbp_credits == nbuffers);
-
-       rbp->rbp_nbuffers = 0;
-       rbp->rbp_credits = 0;
 }
 
 static int
-lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
+lnet_rtrpool_adjust_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
 {
+       struct list_head rb_list;
        lnet_rtrbuf_t *rb;
-       int i;
+       int num_rb;
+       int num_buffers = 0;
+       int npages = rbp->rbp_npages;
 
-       if (rbp->rbp_nbuffers) {
-               LASSERT(rbp->rbp_nbuffers == nbufs);
+       /*
+        * If we are called for less buffers than already in the pool, we
+        * just lower the nbuffers number and excess buffers will be
+        * thrown away as they are returned to the free list.  Credits
+        * then get adjusted as well.
+        */
+       if (nbufs <= rbp->rbp_nbuffers) {
+               lnet_net_lock(cpt);
+               rbp->rbp_nbuffers = nbufs;
+               lnet_net_unlock(cpt);
                return 0;
        }
 
-       for (i = 0; i < nbufs; i++) {
-               rb = lnet_new_rtrbuf(rbp, cpt);
+       INIT_LIST_HEAD(&rb_list);
+
+       /*
+        * allocate the buffers on a local list first.  If all buffers are
+        * allocated successfully then join this list to the rbp buffer
+        * list. If not then free all allocated buffers.
+        */
+       num_rb = rbp->rbp_nbuffers;
 
+       while (num_rb < nbufs) {
+               rb = lnet_new_rtrbuf(rbp, cpt);
                if (!rb) {
-                       CERROR("Failed to allocate %d router bufs of %d pages\n",
-                              nbufs, rbp->rbp_npages);
-                       return -ENOMEM;
+                       CERROR("Failed to allocate %d route bufs of %d pages\n",
+                              nbufs, npages);
+                       goto failed;
                }
 
-               rbp->rbp_nbuffers++;
-               rbp->rbp_credits++;
-               rbp->rbp_mincredits++;
-               list_add(&rb->rb_list, &rbp->rbp_bufs);
-
-               /* No allocation "under fire" */
-               /* Otherwise we'd need code to schedule blocked msgs etc */
-               LASSERT(!the_lnet.ln_routing);
+               list_add(&rb->rb_list, &rb_list);
+               num_buffers++;
+               num_rb++;
        }
 
-       LASSERT(rbp->rbp_credits == nbufs);
+       lnet_net_lock(cpt);
+
+       list_splice_tail(&rb_list, &rbp->rbp_bufs);
+       rbp->rbp_nbuffers += num_buffers;
+       rbp->rbp_credits += num_buffers;
+       rbp->rbp_mincredits = rbp->rbp_credits;
+       /*
+        * We need to schedule blocked msg using the newly
+        * added buffers.
+        */
+       while (!list_empty(&rbp->rbp_bufs) &&
+              !list_empty(&rbp->rbp_msgs))
+               lnet_schedule_blocked_locked(rbp);
+
+       lnet_net_unlock(cpt);
+
        return 0;
+
+failed:
+       while (!list_empty(&rb_list)) {
+               rb = list_entry(rb_list.next, lnet_rtrbuf_t, rb_list);
+               list_del(&rb->rb_list);
+               lnet_destroy_rtrbuf(rb, npages);
+       }
+
+       return -ENOMEM;
 }
 
 static void
@@ -1348,7 +1393,7 @@ lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
 }
 
 void
-lnet_rtrpools_free(void)
+lnet_rtrpools_free(int keep_pools)
 {
        lnet_rtrbufpool_t *rtrp;
        int i;
@@ -1357,17 +1402,19 @@ lnet_rtrpools_free(void)
                return;
 
        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
-               lnet_rtrpool_free_bufs(&rtrp[0]);
-               lnet_rtrpool_free_bufs(&rtrp[1]);
-               lnet_rtrpool_free_bufs(&rtrp[2]);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i);
        }
 
-       cfs_percpt_free(the_lnet.ln_rtrpools);
-       the_lnet.ln_rtrpools = NULL;
+       if (!keep_pools) {
+               cfs_percpt_free(the_lnet.ln_rtrpools);
+               the_lnet.ln_rtrpools = NULL;
+       }
 }
 
 static int
-lnet_nrb_tiny_calculate(int npages)
+lnet_nrb_tiny_calculate(void)
 {
        int nrbs = LNET_NRB_TINY;
 
@@ -1386,7 +1433,7 @@ lnet_nrb_tiny_calculate(int npages)
 }
 
 static int
-lnet_nrb_small_calculate(int npages)
+lnet_nrb_small_calculate(void)
 {
        int nrbs = LNET_NRB_SMALL;
 
@@ -1405,7 +1452,7 @@ lnet_nrb_small_calculate(int npages)
 }
 
 static int
-lnet_nrb_large_calculate(int npages)
+lnet_nrb_large_calculate(void)
 {
        int nrbs = LNET_NRB_LARGE;
 
@@ -1427,16 +1474,12 @@ int
 lnet_rtrpools_alloc(int im_a_router)
 {
        lnet_rtrbufpool_t *rtrp;
-       int large_pages;
-       int small_pages = 1;
        int nrb_tiny;
        int nrb_small;
        int nrb_large;
        int rc;
        int i;
 
-       large_pages = (LNET_MTU + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
-
        if (!strcmp(forwarding, "")) {
                /* not set either way */
                if (!im_a_router)
@@ -1451,15 +1494,15 @@ lnet_rtrpools_alloc(int im_a_router)
                return -EINVAL;
        }
 
-       nrb_tiny = lnet_nrb_tiny_calculate(0);
+       nrb_tiny = lnet_nrb_tiny_calculate();
        if (nrb_tiny < 0)
                return -EINVAL;
 
-       nrb_small = lnet_nrb_small_calculate(small_pages);
+       nrb_small = lnet_nrb_small_calculate();
        if (nrb_small < 0)
                return -EINVAL;
 
-       nrb_large = lnet_nrb_large_calculate(large_pages);
+       nrb_large = lnet_nrb_large_calculate();
        if (nrb_large < 0)
                return -EINVAL;
 
@@ -1473,18 +1516,23 @@ lnet_rtrpools_alloc(int im_a_router)
        }
 
        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
-               lnet_rtrpool_init(&rtrp[0], 0);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny, i);
+               lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
+                                             nrb_tiny, i);
                if (rc)
                        goto failed;
 
-               lnet_rtrpool_init(&rtrp[1], small_pages);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small, i);
+               lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX],
+                                 LNET_NRB_SMALL_PAGES);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
+                                             nrb_small, i);
                if (rc)
                        goto failed;
 
-               lnet_rtrpool_init(&rtrp[2], large_pages);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large, i);
+               lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX],
+                                 LNET_NRB_LARGE_PAGES);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
+                                             nrb_large, i);
                if (rc)
                        goto failed;
        }
@@ -1496,10 +1544,113 @@ lnet_rtrpools_alloc(int im_a_router)
        return 0;
 
  failed:
-       lnet_rtrpools_free();
+       lnet_rtrpools_free(0);
        return rc;
 }
 
+int
+lnet_rtrpools_adjust(int tiny, int small, int large)
+{
+       int nrb = 0;
+       int rc = 0;
+       int i;
+       lnet_rtrbufpool_t *rtrp;
+
+       /*
+        * this function doesn't revert the changes if adding new buffers
+        * failed.  It's up to the user space caller to revert the
+        * changes.
+        */
+
+       if (!the_lnet.ln_routing)
+               return 0;
+
+       /*
+        * If the provided values for each buffer pool are different than the
+        * configured values, we need to take action.
+        */
+       if (tiny >= 0 && tiny != tiny_router_buffers) {
+               tiny_router_buffers = tiny;
+               nrb = lnet_nrb_tiny_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
+                                                     nrb, i);
+                       if (rc)
+                               return rc;
+               }
+       }
+       if (small >= 0 && small != small_router_buffers) {
+               small_router_buffers = small;
+               nrb = lnet_nrb_small_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
+                                                     nrb, i);
+                       if (rc)
+                               return rc;
+               }
+       }
+       if (large >= 0 && large != large_router_buffers) {
+               large_router_buffers = large;
+               nrb = lnet_nrb_large_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
+                                                     nrb, i);
+                       if (rc)
+                               return rc;
+               }
+       }
+
+       return 0;
+}
+
+int
+lnet_rtrpools_enable(void)
+{
+       int rc;
+
+       if (the_lnet.ln_routing)
+               return 0;
+
+       if (!the_lnet.ln_rtrpools)
+               /*
+                * If routing is turned off, and we have never
+                * initialized the pools before, just call the
+                * standard buffer pool allocation routine as
+                * if we are just configuring this for the first
+                * time.
+                */
+               return lnet_rtrpools_alloc(1);
+
+       rc = lnet_rtrpools_adjust(0, 0, 0);
+       if (rc)
+               return rc;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       the_lnet.ln_routing = 1;
+
+       the_lnet.ln_ping_info->pi_features &= ~LNET_PING_FEAT_RTE_DISABLED;
+       lnet_net_unlock(LNET_LOCK_EX);
+
+       return 0;
+}
+
+void
+lnet_rtrpools_disable(void)
+{
+       if (!the_lnet.ln_routing)
+               return;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       the_lnet.ln_routing = 0;
+       the_lnet.ln_ping_info->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
+
+       tiny_router_buffers = 0;
+       small_router_buffers = 0;
+       large_router_buffers = 0;
+       lnet_net_unlock(LNET_LOCK_EX);
+       lnet_rtrpools_free(1);
+}
+
 int
 lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, unsigned long when)
 {
This page took 0.03292 seconds and 5 git commands to generate.