Merge branch 'for-3.8' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/percpu
[deliverable/linux.git] / kernel / rcutree.c
index 74df86bd9204aef5ec9d14ca8b4b0094777bef87..e441b77b614e1eb74b1f764710dc13f2398de17a 100644 (file)
@@ -68,9 +68,9 @@ static struct lock_class_key rcu_fqs_class[RCU_NUM_LVLS];
        .level = { &sname##_state.node[0] }, \
        .call = cr, \
        .fqs_state = RCU_GP_IDLE, \
-       .gpnum = -300, \
-       .completed = -300, \
-       .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.onofflock), \
+       .gpnum = 0UL - 300UL, \
+       .completed = 0UL - 300UL, \
+       .orphan_lock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.orphan_lock), \
        .orphan_nxttail = &sname##_state.orphan_nxtlist, \
        .orphan_donetail = &sname##_state.orphan_donelist, \
        .barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
@@ -207,18 +207,15 @@ EXPORT_SYMBOL_GPL(rcu_note_context_switch);
 DEFINE_PER_CPU(struct rcu_dynticks, rcu_dynticks) = {
        .dynticks_nesting = DYNTICK_TASK_EXIT_IDLE,
        .dynticks = ATOMIC_INIT(1),
-#if defined(CONFIG_RCU_USER_QS) && !defined(CONFIG_RCU_USER_QS_FORCE)
-       .ignore_user_qs = true,
-#endif
 };
 
-static int blimit = 10;                /* Maximum callbacks per rcu_do_batch. */
-static int qhimark = 10000;    /* If this many pending, ignore blimit. */
-static int qlowmark = 100;     /* Once only this many pending, use blimit. */
+static long blimit = 10;       /* Maximum callbacks per rcu_do_batch. */
+static long qhimark = 10000;   /* If this many pending, ignore blimit. */
+static long qlowmark = 100;    /* Once only this many pending, use blimit. */
 
-module_param(blimit, int, 0444);
-module_param(qhimark, int, 0444);
-module_param(qlowmark, int, 0444);
+module_param(blimit, long, 0444);
+module_param(qhimark, long, 0444);
+module_param(qlowmark, long, 0444);
 
 int rcu_cpu_stall_suppress __read_mostly; /* 1 = suppress stall warnings. */
 int rcu_cpu_stall_timeout __read_mostly = CONFIG_RCU_CPU_STALL_TIMEOUT;
@@ -303,7 +300,8 @@ EXPORT_SYMBOL_GPL(rcu_sched_force_quiescent_state);
 static int
 cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 {
-       return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL];
+       return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
+              rdp->nxttail[RCU_DONE_TAIL] != NULL;
 }
 
 /*
@@ -312,8 +310,11 @@ cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 static int
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-       return *rdp->nxttail[RCU_DONE_TAIL +
-                            ACCESS_ONCE(rsp->completed) != rdp->completed] &&
+       struct rcu_head **ntp;
+
+       ntp = rdp->nxttail[RCU_DONE_TAIL +
+                          (ACCESS_ONCE(rsp->completed) != rdp->completed)];
+       return rdp->nxttail[RCU_DONE_TAIL] && ntp && *ntp &&
               !rcu_gp_in_progress(rsp);
 }
 
@@ -416,29 +417,7 @@ EXPORT_SYMBOL_GPL(rcu_idle_enter);
  */
 void rcu_user_enter(void)
 {
-       unsigned long flags;
-       struct rcu_dynticks *rdtp;
-
-       /*
-        * Some contexts may involve an exception occuring in an irq,
-        * leading to that nesting:
-        * rcu_irq_enter() rcu_user_exit() rcu_user_exit() rcu_irq_exit()
-        * This would mess up the dyntick_nesting count though. And rcu_irq_*()
-        * helpers are enough to protect RCU uses inside the exception. So
-        * just return immediately if we detect we are in an IRQ.
-        */
-       if (in_interrupt())
-               return;
-
-       WARN_ON_ONCE(!current->mm);
-
-       local_irq_save(flags);
-       rdtp = &__get_cpu_var(rcu_dynticks);
-       if (!rdtp->ignore_user_qs && !rdtp->in_user) {
-               rdtp->in_user = true;
-               rcu_eqs_enter(true);
-       }
-       local_irq_restore(flags);
+       rcu_eqs_enter(1);
 }
 
 /**
@@ -575,27 +554,7 @@ EXPORT_SYMBOL_GPL(rcu_idle_exit);
  */
 void rcu_user_exit(void)
 {
-       unsigned long flags;
-       struct rcu_dynticks *rdtp;
-
-       /*
-        * Some contexts may involve an exception occuring in an irq,
-        * leading to that nesting:
-        * rcu_irq_enter() rcu_user_exit() rcu_user_exit() rcu_irq_exit()
-        * This would mess up the dyntick_nesting count though. And rcu_irq_*()
-        * helpers are enough to protect RCU uses inside the exception. So
-        * just return immediately if we detect we are in an IRQ.
-        */
-       if (in_interrupt())
-               return;
-
-       local_irq_save(flags);
-       rdtp = &__get_cpu_var(rcu_dynticks);
-       if (rdtp->in_user) {
-               rdtp->in_user = false;
-               rcu_eqs_exit(true);
-       }
-       local_irq_restore(flags);
+       rcu_eqs_exit(1);
 }
 
 /**
@@ -718,21 +677,6 @@ int rcu_is_cpu_idle(void)
 }
 EXPORT_SYMBOL(rcu_is_cpu_idle);
 
-#ifdef CONFIG_RCU_USER_QS
-void rcu_user_hooks_switch(struct task_struct *prev,
-                          struct task_struct *next)
-{
-       struct rcu_dynticks *rdtp;
-
-       /* Interrupts are disabled in context switch */
-       rdtp = &__get_cpu_var(rcu_dynticks);
-       if (!rdtp->ignore_user_qs) {
-               clear_tsk_thread_flag(prev, TIF_NOHZ);
-               set_tsk_thread_flag(next, TIF_NOHZ);
-       }
-}
-#endif /* #ifdef CONFIG_RCU_USER_QS */
-
 #if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU)
 
 /*
@@ -873,6 +817,29 @@ static void record_gp_stall_check_time(struct rcu_state *rsp)
        rsp->jiffies_stall = jiffies + jiffies_till_stall_check();
 }
 
+/*
+ * Dump stacks of all tasks running on stalled CPUs.  This is a fallback
+ * for architectures that do not implement trigger_all_cpu_backtrace().
+ * The NMI-triggered stack traces are more accurate because they are
+ * printed by the target CPU.
+ */
+static void rcu_dump_cpu_stacks(struct rcu_state *rsp)
+{
+       int cpu;
+       unsigned long flags;
+       struct rcu_node *rnp;
+
+       rcu_for_each_leaf_node(rsp, rnp) {
+               raw_spin_lock_irqsave(&rnp->lock, flags);
+               if (rnp->qsmask != 0) {
+                       for (cpu = 0; cpu <= rnp->grphi - rnp->grplo; cpu++)
+                               if (rnp->qsmask & (1UL << cpu))
+                                       dump_cpu_task(rnp->grplo + cpu);
+               }
+               raw_spin_unlock_irqrestore(&rnp->lock, flags);
+       }
+}
+
 static void print_other_cpu_stall(struct rcu_state *rsp)
 {
        int cpu;
@@ -880,6 +847,7 @@ static void print_other_cpu_stall(struct rcu_state *rsp)
        unsigned long flags;
        int ndetected = 0;
        struct rcu_node *rnp = rcu_get_root(rsp);
+       long totqlen = 0;
 
        /* Only let one CPU complain about others per time interval. */
 
@@ -924,12 +892,15 @@ static void print_other_cpu_stall(struct rcu_state *rsp)
        raw_spin_unlock_irqrestore(&rnp->lock, flags);
 
        print_cpu_stall_info_end();
-       printk(KERN_CONT "(detected by %d, t=%ld jiffies)\n",
-              smp_processor_id(), (long)(jiffies - rsp->gp_start));
+       for_each_possible_cpu(cpu)
+               totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+       pr_cont("(detected by %d, t=%ld jiffies, g=%lu, c=%lu, q=%lu)\n",
+              smp_processor_id(), (long)(jiffies - rsp->gp_start),
+              rsp->gpnum, rsp->completed, totqlen);
        if (ndetected == 0)
                printk(KERN_ERR "INFO: Stall ended before state dump start\n");
        else if (!trigger_all_cpu_backtrace())
-               dump_stack();
+               rcu_dump_cpu_stacks(rsp);
 
        /* Complain about tasks blocking the grace period. */
 
@@ -940,8 +911,10 @@ static void print_other_cpu_stall(struct rcu_state *rsp)
 
 static void print_cpu_stall(struct rcu_state *rsp)
 {
+       int cpu;
        unsigned long flags;
        struct rcu_node *rnp = rcu_get_root(rsp);
+       long totqlen = 0;
 
        /*
         * OK, time to rat on ourselves...
@@ -952,7 +925,10 @@ static void print_cpu_stall(struct rcu_state *rsp)
        print_cpu_stall_info_begin();
        print_cpu_stall_info(rsp, smp_processor_id());
        print_cpu_stall_info_end();
-       printk(KERN_CONT " (t=%lu jiffies)\n", jiffies - rsp->gp_start);
+       for_each_possible_cpu(cpu)
+               totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+       pr_cont(" (t=%lu jiffies g=%lu c=%lu q=%lu)\n",
+               jiffies - rsp->gp_start, rsp->gpnum, rsp->completed, totqlen);
        if (!trigger_all_cpu_backtrace())
                dump_stack();
 
@@ -1091,6 +1067,7 @@ static void init_callback_list(struct rcu_data *rdp)
        rdp->nxtlist = NULL;
        for (i = 0; i < RCU_NEXT_SIZE; i++)
                rdp->nxttail[i] = &rdp->nxtlist;
+       init_nocb_callback_list(rdp);
 }
 
 /*
@@ -1404,15 +1381,37 @@ rcu_start_gp(struct rcu_state *rsp, unsigned long flags)
            !cpu_needs_another_gp(rsp, rdp)) {
                /*
                 * Either we have not yet spawned the grace-period
-                * task or this CPU does not need another grace period.
+                * task, this CPU does not need another grace period,
+                * or a grace period is already in progress.
                 * Either way, don't start a new grace period.
                 */
                raw_spin_unlock_irqrestore(&rnp->lock, flags);
                return;
        }
 
+       /*
+        * Because there is no grace period in progress right now,
+        * any callbacks we have up to this point will be satisfied
+        * by the next grace period.  So promote all callbacks to be
+        * handled after the end of the next grace period.  If the
+        * CPU is not yet aware of the end of the previous grace period,
+        * we need to allow for the callback advancement that will
+        * occur when it does become aware.  Deadlock prevents us from
+        * making it aware at this point: We cannot acquire a leaf
+        * rcu_node ->lock while holding the root rcu_node ->lock.
+        */
+       rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+       if (rdp->completed == rsp->completed)
+               rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+
        rsp->gp_flags = RCU_GP_FLAG_INIT;
-       raw_spin_unlock_irqrestore(&rnp->lock, flags);
+       raw_spin_unlock(&rnp->lock); /* Interrupts remain disabled. */
+
+       /* Ensure that CPU is aware of completion of last grace period. */
+       rcu_process_gp_end(rsp, rdp);
+       local_irq_restore(flags);
+
+       /* Wake up rcu_gp_kthread() to start the grace period. */
        wake_up(&rsp->gp_wq);
 }
 
@@ -1573,16 +1572,20 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 /*
  * Send the specified CPU's RCU callbacks to the orphanage.  The
  * specified CPU must be offline, and the caller must hold the
- * ->onofflock.
+ * ->orphan_lock.
  */
 static void
 rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
                          struct rcu_node *rnp, struct rcu_data *rdp)
 {
+       /* No-CBs CPUs do not have orphanable callbacks. */
+       if (is_nocb_cpu(rdp->cpu))
+               return;
+
        /*
         * Orphan the callbacks.  First adjust the counts.  This is safe
-        * because ->onofflock excludes _rcu_barrier()'s adoption of
-        * the callbacks, thus no memory barrier is required.
+        * because _rcu_barrier() excludes CPU-hotplug operations, so it
+        * cannot be running now.  Thus no memory barrier is required.
         */
        if (rdp->nxtlist != NULL) {
                rsp->qlen_lazy += rdp->qlen_lazy;
@@ -1623,13 +1626,17 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 
 /*
  * Adopt the RCU callbacks from the specified rcu_state structure's
- * orphanage.  The caller must hold the ->onofflock.
+ * orphanage.  The caller must hold the ->orphan_lock.
  */
 static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
 {
        int i;
        struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
 
+       /* No-CBs CPUs are handled specially. */
+       if (rcu_nocb_adopt_orphan_cbs(rsp, rdp))
+               return;
+
        /* Do the accounting first. */
        rdp->qlen_lazy += rsp->qlen_lazy;
        rdp->qlen += rsp->qlen;
@@ -1702,7 +1709,7 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 
        /* Exclude any attempts to start a new grace period. */
        mutex_lock(&rsp->onoff_mutex);
-       raw_spin_lock_irqsave(&rsp->onofflock, flags);
+       raw_spin_lock_irqsave(&rsp->orphan_lock, flags);
 
        /* Orphan the dead CPU's callbacks, and adopt them if appropriate. */
        rcu_send_cbs_to_orphanage(cpu, rsp, rnp, rdp);
@@ -1729,10 +1736,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
        /*
         * We still hold the leaf rcu_node structure lock here, and
         * irqs are still disabled.  The reason for this subterfuge is
-        * because invoking rcu_report_unblock_qs_rnp() with ->onofflock
+        * because invoking rcu_report_unblock_qs_rnp() with ->orphan_lock
         * held leads to deadlock.
         */
-       raw_spin_unlock(&rsp->onofflock); /* irqs remain disabled. */
+       raw_spin_unlock(&rsp->orphan_lock); /* irqs remain disabled. */
        rnp = rdp->mynode;
        if (need_report & RCU_OFL_TASKS_NORM_GP)
                rcu_report_unblock_qs_rnp(rnp, flags);
@@ -1769,7 +1776,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 {
        unsigned long flags;
        struct rcu_head *next, *list, **tail;
-       int bl, count, count_lazy, i;
+       long bl, count, count_lazy;
+       int i;
 
        /* If no callbacks are ready, just return.*/
        if (!cpu_has_callbacks_ready_to_invoke(rdp)) {
@@ -2107,9 +2115,15 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
        }
 }
 
+/*
+ * Helper function for call_rcu() and friends.  The cpu argument will
+ * normally be -1, indicating "currently running CPU".  It may specify
+ * a CPU only if that CPU is a no-CBs CPU.  Currently, only _rcu_barrier()
+ * is expected to specify a CPU.
+ */
 static void
 __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
-          struct rcu_state *rsp, bool lazy)
+          struct rcu_state *rsp, int cpu, bool lazy)
 {
        unsigned long flags;
        struct rcu_data *rdp;
@@ -2129,9 +2143,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
        rdp = this_cpu_ptr(rsp->rda);
 
        /* Add the callback to our list. */
-       if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL)) {
+       if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+               int offline;
+
+               if (cpu != -1)
+                       rdp = per_cpu_ptr(rsp->rda, cpu);
+               offline = !__call_rcu_nocb(rdp, head, lazy);
+               WARN_ON_ONCE(offline);
                /* _call_rcu() is illegal on offline CPU; leak the callback. */
-               WARN_ON_ONCE(1);
                local_irq_restore(flags);
                return;
        }
@@ -2160,7 +2179,7 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
  */
 void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_sched_state, 0);
+       __call_rcu(head, func, &rcu_sched_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_sched);
 
@@ -2169,7 +2188,7 @@ EXPORT_SYMBOL_GPL(call_rcu_sched);
  */
 void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_bh_state, 0);
+       __call_rcu(head, func, &rcu_bh_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
@@ -2205,10 +2224,28 @@ static inline int rcu_blocking_is_gp(void)
  * rcu_read_lock_sched().
  *
  * This means that all preempt_disable code sequences, including NMI and
- * hardware-interrupt handlers, in progress on entry will have completed
- * before this primitive returns.  However, this does not guarantee that
- * softirq handlers will have completed, since in some kernels, these
- * handlers can run in process context, and can block.
+ * non-threaded hardware-interrupt handlers, in progress on entry will
+ * have completed before this primitive returns.  However, this does not
+ * guarantee that softirq handlers will have completed, since in some
+ * kernels, these handlers can run in process context, and can block.
+ *
+ * Note that this guarantee implies further memory-ordering guarantees.
+ * On systems with more than one CPU, when synchronize_sched() returns,
+ * each CPU is guaranteed to have executed a full memory barrier since the
+ * end of its last RCU-sched read-side critical section whose beginning
+ * preceded the call to synchronize_sched().  In addition, each CPU having
+ * an RCU read-side critical section that extends beyond the return from
+ * synchronize_sched() is guaranteed to have executed a full memory barrier
+ * after the beginning of synchronize_sched() and before the beginning of
+ * that RCU read-side critical section.  Note that these guarantees include
+ * CPUs that are offline, idle, or executing in user mode, as well as CPUs
+ * that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked synchronize_sched(), which returned
+ * to its caller on CPU B, then both CPU A and CPU B are guaranteed
+ * to have executed a full memory barrier during the execution of
+ * synchronize_sched() -- even if CPU A and CPU B are the same CPU (but
+ * again only if the system has more than one CPU).
  *
  * This primitive provides the guarantees made by the (now removed)
  * synchronize_kernel() API.  In contrast, synchronize_rcu() only
@@ -2224,7 +2261,10 @@ void synchronize_sched(void)
                           "Illegal synchronize_sched() in RCU-sched read-side critical section");
        if (rcu_blocking_is_gp())
                return;
-       wait_rcu_gp(call_rcu_sched);
+       if (rcu_expedited)
+               synchronize_sched_expedited();
+       else
+               wait_rcu_gp(call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(synchronize_sched);
 
@@ -2236,6 +2276,9 @@ EXPORT_SYMBOL_GPL(synchronize_sched);
  * read-side critical sections have completed.  RCU read-side critical
  * sections are delimited by rcu_read_lock_bh() and rcu_read_unlock_bh(),
  * and may be nested.
+ *
+ * See the description of synchronize_sched() for more detailed information
+ * on memory ordering guarantees.
  */
 void synchronize_rcu_bh(void)
 {
@@ -2245,13 +2288,13 @@ void synchronize_rcu_bh(void)
                           "Illegal synchronize_rcu_bh() in RCU-bh read-side critical section");
        if (rcu_blocking_is_gp())
                return;
-       wait_rcu_gp(call_rcu_bh);
+       if (rcu_expedited)
+               synchronize_rcu_bh_expedited();
+       else
+               wait_rcu_gp(call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(synchronize_rcu_bh);
 
-static atomic_t sync_sched_expedited_started = ATOMIC_INIT(0);
-static atomic_t sync_sched_expedited_done = ATOMIC_INIT(0);
-
 static int synchronize_sched_expedited_cpu_stop(void *data)
 {
        /*
@@ -2308,10 +2351,32 @@ static int synchronize_sched_expedited_cpu_stop(void *data)
  */
 void synchronize_sched_expedited(void)
 {
-       int firstsnap, s, snap, trycount = 0;
+       long firstsnap, s, snap;
+       int trycount = 0;
+       struct rcu_state *rsp = &rcu_sched_state;
+
+       /*
+        * If we are in danger of counter wrap, just do synchronize_sched().
+        * By allowing sync_sched_expedited_started to advance no more than
+        * ULONG_MAX/8 ahead of sync_sched_expedited_done, we are ensuring
+        * that more than 3.5 billion CPUs would be required to force a
+        * counter wrap on a 32-bit system.  Quite a few more CPUs would of
+        * course be required on a 64-bit system.
+        */
+       if (ULONG_CMP_GE((ulong)atomic_long_read(&rsp->expedited_start),
+                        (ulong)atomic_long_read(&rsp->expedited_done) +
+                        ULONG_MAX / 8)) {
+               synchronize_sched();
+               atomic_long_inc(&rsp->expedited_wrap);
+               return;
+       }
 
-       /* Note that atomic_inc_return() implies full memory barrier. */
-       firstsnap = snap = atomic_inc_return(&sync_sched_expedited_started);
+       /*
+        * Take a ticket.  Note that atomic_inc_return() implies a
+        * full memory barrier.
+        */
+       snap = atomic_long_inc_return(&rsp->expedited_start);
+       firstsnap = snap;
        get_online_cpus();
        WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));
 
@@ -2323,48 +2388,65 @@ void synchronize_sched_expedited(void)
                             synchronize_sched_expedited_cpu_stop,
                             NULL) == -EAGAIN) {
                put_online_cpus();
+               atomic_long_inc(&rsp->expedited_tryfail);
+
+               /* Check to see if someone else did our work for us. */
+               s = atomic_long_read(&rsp->expedited_done);
+               if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
+                       /* ensure test happens before caller kfree */
+                       smp_mb__before_atomic_inc(); /* ^^^ */
+                       atomic_long_inc(&rsp->expedited_workdone1);
+                       return;
+               }
 
                /* No joy, try again later.  Or just synchronize_sched(). */
                if (trycount++ < 10) {
                        udelay(trycount * num_online_cpus());
                } else {
-                       synchronize_sched();
+                       wait_rcu_gp(call_rcu_sched);
+                       atomic_long_inc(&rsp->expedited_normal);
                        return;
                }
 
-               /* Check to see if someone else did our work for us. */
-               s = atomic_read(&sync_sched_expedited_done);
-               if (UINT_CMP_GE((unsigned)s, (unsigned)firstsnap)) {
-                       smp_mb(); /* ensure test happens before caller kfree */
+               /* Recheck to see if someone else did our work for us. */
+               s = atomic_long_read(&rsp->expedited_done);
+               if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
+                       /* ensure test happens before caller kfree */
+                       smp_mb__before_atomic_inc(); /* ^^^ */
+                       atomic_long_inc(&rsp->expedited_workdone2);
                        return;
                }
 
                /*
                 * Refetching sync_sched_expedited_started allows later
-                * callers to piggyback on our grace period.  We subtract
-                * 1 to get the same token that the last incrementer got.
-                * We retry after they started, so our grace period works
-                * for them, and they started after our first try, so their
-                * grace period works for us.
+                * callers to piggyback on our grace period.  We retry
+                * after they started, so our grace period works for them,
+                * and they started after our first try, so their grace
+                * period works for us.
                 */
                get_online_cpus();
-               snap = atomic_read(&sync_sched_expedited_started);
+               snap = atomic_long_read(&rsp->expedited_start);
                smp_mb(); /* ensure read is before try_stop_cpus(). */
        }
+       atomic_long_inc(&rsp->expedited_stoppedcpus);
 
        /*
         * Everyone up to our most recent fetch is covered by our grace
         * period.  Update the counter, but only if our work is still
         * relevant -- which it won't be if someone who started later
-        * than we did beat us to the punch.
+        * than we did already did their update.
         */
        do {
-               s = atomic_read(&sync_sched_expedited_done);
-               if (UINT_CMP_GE((unsigned)s, (unsigned)snap)) {
-                       smp_mb(); /* ensure test happens before caller kfree */
+               atomic_long_inc(&rsp->expedited_done_tries);
+               s = atomic_long_read(&rsp->expedited_done);
+               if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
+                       /* ensure test happens before caller kfree */
+                       smp_mb__before_atomic_inc(); /* ^^^ */
+                       atomic_long_inc(&rsp->expedited_done_lost);
                        break;
                }
-       } while (atomic_cmpxchg(&sync_sched_expedited_done, s, snap) != s);
+       } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
+       atomic_long_inc(&rsp->expedited_done_exit);
 
        put_online_cpus();
 }
@@ -2558,9 +2640,17 @@ static void _rcu_barrier(struct rcu_state *rsp)
         * When that callback is invoked, we will know that all of the
         * corresponding CPU's preceding callbacks have been invoked.
         */
-       for_each_online_cpu(cpu) {
+       for_each_possible_cpu(cpu) {
+               if (!cpu_online(cpu) && !is_nocb_cpu(cpu))
+                       continue;
                rdp = per_cpu_ptr(rsp->rda, cpu);
-               if (ACCESS_ONCE(rdp->qlen)) {
+               if (is_nocb_cpu(cpu)) {
+                       _rcu_barrier_trace(rsp, "OnlineNoCB", cpu,
+                                          rsp->n_barrier_done);
+                       atomic_inc(&rsp->barrier_cpu_count);
+                       __call_rcu(&rdp->barrier_head, rcu_barrier_callback,
+                                  rsp, cpu, 0);
+               } else if (ACCESS_ONCE(rdp->qlen)) {
                        _rcu_barrier_trace(rsp, "OnlineQ", cpu,
                                           rsp->n_barrier_done);
                        smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -2634,6 +2724,7 @@ rcu_boot_init_percpu_data(int cpu, struct rcu_state *rsp)
 #endif
        rdp->cpu = cpu;
        rdp->rsp = rsp;
+       rcu_boot_init_nocb_percpu_data(rdp);
        raw_spin_unlock_irqrestore(&rnp->lock, flags);
 }
 
@@ -2715,6 +2806,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
        struct rcu_data *rdp = per_cpu_ptr(rcu_state->rda, cpu);
        struct rcu_node *rnp = rdp->mynode;
        struct rcu_state *rsp;
+       int ret = NOTIFY_OK;
 
        trace_rcu_utilization("Start CPU hotplug");
        switch (action) {
@@ -2728,7 +2820,10 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
                rcu_boost_kthread_setaffinity(rnp, -1);
                break;
        case CPU_DOWN_PREPARE:
-               rcu_boost_kthread_setaffinity(rnp, cpu);
+               if (nocb_cpu_expendable(cpu))
+                       rcu_boost_kthread_setaffinity(rnp, cpu);
+               else
+                       ret = NOTIFY_BAD;
                break;
        case CPU_DYING:
        case CPU_DYING_FROZEN:
@@ -2752,7 +2847,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
                break;
        }
        trace_rcu_utilization("End CPU hotplug");
-       return NOTIFY_OK;
+       return ret;
 }
 
 /*
@@ -2772,6 +2867,7 @@ static int __init rcu_spawn_gp_kthread(void)
                raw_spin_lock_irqsave(&rnp->lock, flags);
                rsp->gp_kthread = t;
                raw_spin_unlock_irqrestore(&rnp->lock, flags);
+               rcu_spawn_nocb_kthreads(rsp);
        }
        return 0;
 }
@@ -2967,6 +3063,7 @@ void __init rcu_init(void)
        rcu_init_one(&rcu_sched_state, &rcu_sched_data);
        rcu_init_one(&rcu_bh_state, &rcu_bh_data);
        __rcu_init_preempt();
+       rcu_init_nocb();
         open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
 
        /*
This page took 0.045703 seconds and 5 git commands to generate.