drbd: don't count sendpage()d pages only referenced by tcp as in use
[deliverable/linux.git] / drivers / block / drbd / drbd_receiver.c
index ec1711f7c5c56db8d2d089e849728a29f5f62980..2c3edf0ac5ca5d93ee3634d5e7ebc7a289202977 100644 (file)
@@ -241,7 +241,7 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &reclaimed, w.list)
-               drbd_free_ee(mdev, e);
+               drbd_free_net_ee(mdev, e);
 }
 
 /**
@@ -298,9 +298,11 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool
  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
  * Either links the page chain back to the global pool,
  * or returns all pages to the system. */
-static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
+static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
 {
+       atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
        int i;
+
        if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
                i = page_chain_free(page);
        else {
@@ -311,10 +313,10 @@ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
                drbd_pp_vacant += i;
                spin_unlock(&drbd_pp_lock);
        }
-       atomic_sub(i, &mdev->pp_in_use);
-       i = atomic_read(&mdev->pp_in_use);
+       i = atomic_sub_return(i, a);
        if (i < 0)
-               dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
+               dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
+                       is_net ? "pp_in_use_by_net" : "pp_in_use", i);
        wake_up(&drbd_pp_wait);
 }
 
@@ -365,7 +367,6 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
        e->size = data_size;
        e->flags = 0;
        e->sector = sector;
-       e->sector = sector;
        e->block_id = id;
 
        return e;
@@ -375,9 +376,11 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
        return NULL;
 }
 
-void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
+void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
 {
-       drbd_pp_free(mdev, e->pages);
+       if (e->flags & EE_HAS_DIGEST)
+               kfree(e->digest);
+       drbd_pp_free(mdev, e->pages, is_net);
        D_ASSERT(atomic_read(&e->pending_bios) == 0);
        D_ASSERT(hlist_unhashed(&e->colision));
        mempool_free(e, drbd_ee_mempool);
@@ -388,13 +391,14 @@ int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
        LIST_HEAD(work_list);
        struct drbd_epoch_entry *e, *t;
        int count = 0;
+       int is_net = list == &mdev->net_ee;
 
        spin_lock_irq(&mdev->req_lock);
        list_splice_init(list, &work_list);
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &work_list, w.list) {
-               drbd_free_ee(mdev, e);
+               drbd_free_some_ee(mdev, e, is_net);
                count++;
        }
        return count;
@@ -423,7 +427,7 @@ static int drbd_process_done_ee(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &reclaimed, w.list)
-               drbd_free_ee(mdev, e);
+               drbd_free_net_ee(mdev, e);
 
        /* possible callbacks here:
         * e_end_block, and e_end_resync_block, e_send_discard_ack.
@@ -719,14 +723,14 @@ out:
 static int drbd_send_fp(struct drbd_conf *mdev,
        struct socket *sock, enum drbd_packets cmd)
 {
-       struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
+       struct p_header80 *h = &mdev->data.sbuf.header.h80;
 
        return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
 }
 
 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
 {
-       struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
+       struct p_header80 *h = &mdev->data.rbuf.header.h80;
        int rr;
 
        rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
@@ -776,9 +780,6 @@ static int drbd_connect(struct drbd_conf *mdev)
 
        D_ASSERT(!mdev->data.socket);
 
-       if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
-               dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
-
        if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
                return -2;
 
@@ -927,6 +928,11 @@ retry:
 
        drbd_thread_start(&mdev->asender);
 
+       if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
+               drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
+               put_ldev(mdev);
+       }
+
        if (!drbd_send_protocol(mdev))
                return -1;
        drbd_send_sync_param(mdev, &mdev->sync_conf);
@@ -946,22 +952,27 @@ out_release_sockets:
        return -1;
 }
 
-static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
+static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
 {
+       union p_header *h = &mdev->data.rbuf.header;
        int r;
 
        r = drbd_recv(mdev, h, sizeof(*h));
-
        if (unlikely(r != sizeof(*h))) {
                dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
                return FALSE;
-       };
-       h->command = be16_to_cpu(h->command);
-       h->length  = be16_to_cpu(h->length);
-       if (unlikely(h->magic != BE_DRBD_MAGIC)) {
+       }
+
+       if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
+               *cmd = be16_to_cpu(h->h80.command);
+               *packet_size = be16_to_cpu(h->h80.length);
+       } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
+               *cmd = be16_to_cpu(h->h95.command);
+               *packet_size = be32_to_cpu(h->h95.length);
+       } else {
                dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
-                   (long)be32_to_cpu(h->magic),
-                   h->command, h->length);
+                   (long)be32_to_cpu(h->h80.magic),
+                   h->h80.command, h->h80.length);
                return FALSE;
        }
        mdev->last_received = jiffies;
@@ -1180,7 +1191,7 @@ next_bio:
        bio->bi_sector = sector;
        bio->bi_bdev = mdev->ldev->backing_bdev;
        /* we special case some flags in the multi-bio case, see below
-        * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
+        * (REQ_UNPLUG, REQ_HARDBARRIER) */
        bio->bi_rw = rw;
        bio->bi_private = e;
        bio->bi_end_io = drbd_endio_sec;
@@ -1209,16 +1220,16 @@ next_bio:
                bios = bios->bi_next;
                bio->bi_next = NULL;
 
-               /* strip off BIO_RW_UNPLUG unless it is the last bio */
+               /* strip off REQ_UNPLUG unless it is the last bio */
                if (bios)
-                       bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
+                       bio->bi_rw &= ~REQ_UNPLUG;
 
                drbd_generic_make_request(mdev, fault_type, bio);
 
-               /* strip off BIO_RW_BARRIER,
+               /* strip off REQ_HARDBARRIER,
                 * unless it is the first or last bio */
                if (bios && bios->bi_next)
-                       bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
+                       bios->bi_rw &= ~REQ_HARDBARRIER;
        } while (bios);
        maybe_kick_lo(mdev);
        return 0;
@@ -1233,7 +1244,7 @@ fail:
 }
 
 /**
- * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
+ * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
  * @mdev:      DRBD device.
  * @w:         work object.
  * @cancel:    The connection will be closed anyways (unused in this callback)
@@ -1245,7 +1256,7 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
           (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
           so that we can finish that epoch in drbd_may_finish_epoch().
           That is necessary if we already have a long chain of Epochs, before
-          we realize that BIO_RW_BARRIER is actually not supported */
+          we realize that REQ_HARDBARRIER is actually not supported */
 
        /* As long as the -ENOTSUPP on the barrier is reported immediately
           that will never trigger. If it is reported late, we will just
@@ -1268,17 +1279,12 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
        return 1;
 }
 
-static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
+static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        int rv, issue_flush;
-       struct p_barrier *p = (struct p_barrier *)h;
+       struct p_barrier *p = &mdev->data.rbuf.barrier;
        struct drbd_epoch *epoch;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-
-       rv = drbd_recv(mdev, h->payload, h->length);
-       ERR_IF(rv != h->length) return FALSE;
-
        inc_unacked(mdev);
 
        if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
@@ -1457,7 +1463,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
                data_size -= rr;
        }
        kunmap(page);
-       drbd_pp_free(mdev, page);
+       drbd_pp_free(mdev, page, 0);
        return rv;
 }
 
@@ -1562,6 +1568,7 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
        list_add(&e->w.list, &mdev->sync_ee);
        spin_unlock_irq(&mdev->req_lock);
 
+       atomic_add(data_size >> 9, &mdev->rs_sect_ev);
        if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
                return TRUE;
 
@@ -1571,21 +1578,12 @@ fail:
        return FALSE;
 }
 
-static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
+static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        struct drbd_request *req;
        sector_t sector;
-       unsigned int header_size, data_size;
        int ok;
-       struct p_data *p = (struct p_data *)h;
-
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
+       struct p_data *p = &mdev->data.rbuf.data;
 
        sector = be64_to_cpu(p->sector);
 
@@ -1611,20 +1609,11 @@ static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
        return ok;
 }
 
-static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
+static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        sector_t sector;
-       unsigned int header_size, data_size;
        int ok;
-       struct p_data *p = (struct p_data *)h;
-
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
+       struct p_data *p = &mdev->data.rbuf.data;
 
        sector = be64_to_cpu(p->sector);
        D_ASSERT(p->block_id == ID_SYNCER);
@@ -1643,6 +1632,8 @@ static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
                drbd_send_ack_dp(mdev, P_NEG_ACK, p);
        }
 
+       atomic_add(data_size >> 9, &mdev->rs_sect_in);
+
        return ok;
 }
 
@@ -1765,24 +1756,27 @@ static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
        return ret;
 }
 
+static unsigned long write_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
+{
+       if (mdev->agreed_pro_version >= 95)
+               return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
+                       (dpf & DP_UNPLUG ? REQ_UNPLUG : 0) |
+                       (dpf & DP_FUA ? REQ_FUA : 0) |
+                       (dpf & DP_FLUSH ? REQ_FUA : 0) |
+                       (dpf & DP_DISCARD ? REQ_DISCARD : 0);
+       else
+               return dpf & DP_RW_SYNC ? (REQ_SYNC | REQ_UNPLUG) : 0;
+}
+
 /* mirrored write */
-static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
+static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        sector_t sector;
        struct drbd_epoch_entry *e;
-       struct p_data *p = (struct p_data *)h;
-       int header_size, data_size;
+       struct p_data *p = &mdev->data.rbuf.data;
        int rw = WRITE;
        u32 dp_flags;
 
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
-
        if (!get_ldev(mdev)) {
                if (__ratelimit(&drbd_ratelimit_state))
                        dev_err(DEV, "Can not write mirrored data block "
@@ -1824,14 +1818,14 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
                epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
                if (epoch == e->epoch) {
                        set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
-                       rw |= (1<<BIO_RW_BARRIER);
+                       rw |= REQ_HARDBARRIER;
                        e->flags |= EE_IS_BARRIER;
                } else {
                        if (atomic_read(&epoch->epoch_size) > 1 ||
                            !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
                                set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
                                set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
-                               rw |= (1<<BIO_RW_BARRIER);
+                               rw |= REQ_HARDBARRIER;
                                e->flags |= EE_IS_BARRIER;
                        }
                }
@@ -1839,12 +1833,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
        spin_unlock(&mdev->epoch_lock);
 
        dp_flags = be32_to_cpu(p->dp_flags);
-       if (dp_flags & DP_HARDBARRIER) {
-               dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
-               /* rw |= (1<<BIO_RW_BARRIER); */
-       }
-       if (dp_flags & DP_RW_SYNC)
-               rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
+       rw |= write_flags_to_bio(mdev, dp_flags);
+
        if (dp_flags & DP_MAY_SET_IN_SYNC)
                e->flags |= EE_MAY_SET_IN_SYNC;
 
@@ -2016,20 +2006,64 @@ out_interrupted:
        return FALSE;
 }
 
-static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
+/* We may throttle resync, if the lower device seems to be busy,
+ * and current sync rate is above c_min_rate.
+ *
+ * To decide whether or not the lower device is busy, we use a scheme similar
+ * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
+ * (more than 64 sectors) of activity we cannot account for with our own resync
+ * activity, it obviously is "busy".
+ *
+ * The current sync rate used here uses only the most recent two step marks,
+ * to have a short time average so we can react faster.
+ */
+int drbd_rs_should_slow_down(struct drbd_conf *mdev)
+{
+       struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
+       unsigned long db, dt, dbdt;
+       int curr_events;
+       int throttle = 0;
+
+       /* feature disabled? */
+       if (mdev->sync_conf.c_min_rate == 0)
+               return 0;
+
+       curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
+                     (int)part_stat_read(&disk->part0, sectors[1]) -
+                       atomic_read(&mdev->rs_sect_ev);
+       if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
+               unsigned long rs_left;
+               int i;
+
+               mdev->rs_last_events = curr_events;
+
+               /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
+                * approx. */
+               i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-2) % DRBD_SYNC_MARKS;
+               rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
+
+               dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
+               if (!dt)
+                       dt++;
+               db = mdev->rs_mark_left[i] - rs_left;
+               dbdt = Bit2KB(db/dt);
+
+               if (dbdt > mdev->sync_conf.c_min_rate)
+                       throttle = 1;
+       }
+       return throttle;
+}
+
+
+static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
 {
        sector_t sector;
        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
        struct drbd_epoch_entry *e;
        struct digest_info *di = NULL;
-       int size, digest_size;
+       int size;
        unsigned int fault_type;
-       struct p_block_req *p =
-               (struct p_block_req *)h;
-       const int brps = sizeof(*p)-sizeof(*h);
-
-       if (drbd_recv(mdev, h->payload, brps) != brps)
-               return FALSE;
+       struct p_block_req *p = &mdev->data.rbuf.block_req;
 
        sector = be64_to_cpu(p->sector);
        size   = be32_to_cpu(p->blksize);
@@ -2049,9 +2083,9 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                if (__ratelimit(&drbd_ratelimit_state))
                        dev_err(DEV, "Can not satisfy peer's read request, "
                            "no local data.\n");
-               drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
+               drbd_send_ack_rp(mdev, cmd == P_DATA_REQUEST ? P_NEG_DREPLY :
                                 P_NEG_RS_DREPLY , p);
-               return drbd_drain_block(mdev, h->length - brps);
+               return TRUE;
        }
 
        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
@@ -2063,31 +2097,21 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                return FALSE;
        }
 
-       switch (h->command) {
+       switch (cmd) {
        case P_DATA_REQUEST:
                e->w.cb = w_e_end_data_req;
                fault_type = DRBD_FAULT_DT_RD;
-               break;
+               /* application IO, don't drbd_rs_begin_io */
+               goto submit;
+
        case P_RS_DATA_REQUEST:
                e->w.cb = w_e_end_rsdata_req;
                fault_type = DRBD_FAULT_RS_RD;
-               /* Eventually this should become asynchronously. Currently it
-                * blocks the whole receiver just to delay the reading of a
-                * resync data block.
-                * the drbd_work_queue mechanism is made for this...
-                */
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted,
-                        * probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
-               }
                break;
 
        case P_OV_REPLY:
        case P_CSUM_RS_REQUEST:
                fault_type = DRBD_FAULT_RS_RD;
-               digest_size = h->length - brps ;
                di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
                if (!di)
                        goto out_free_e;
@@ -2095,23 +2119,21 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                di->digest_size = digest_size;
                di->digest = (((char *)di)+sizeof(struct digest_info));
 
+               e->digest = di;
+               e->flags |= EE_HAS_DIGEST;
+
                if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
                        goto out_free_e;
 
-               e->block_id = (u64)(unsigned long)di;
-               if (h->command == P_CSUM_RS_REQUEST) {
+               if (cmd == P_CSUM_RS_REQUEST) {
                        D_ASSERT(mdev->agreed_pro_version >= 89);
                        e->w.cb = w_e_end_csum_rs_req;
-               } else if (h->command == P_OV_REPLY) {
+               } else if (cmd == P_OV_REPLY) {
                        e->w.cb = w_e_end_ov_reply;
                        dec_rs_pending(mdev);
-                       break;
-               }
-
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted, probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
+                       /* drbd_rs_begin_io done when we sent this request,
+                        * but accounting still needs to be done. */
+                       goto submit_for_resync;
                }
                break;
 
@@ -2130,37 +2152,55 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                }
                e->w.cb = w_e_end_ov_req;
                fault_type = DRBD_FAULT_RS_RD;
-               /* Eventually this should become asynchronous. Currently it
-                * blocks the whole receiver just to delay the reading of a
-                * resync data block.
-                * the drbd_work_queue mechanism is made for this...
-                */
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted,
-                        * probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
-               }
                break;
 
-
        default:
                dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
-                   cmdname(h->command));
+                   cmdname(cmd));
                fault_type = DRBD_FAULT_MAX;
+               goto out_free_e;
        }
 
-       spin_lock_irq(&mdev->req_lock);
-       list_add(&e->w.list, &mdev->read_ee);
-       spin_unlock_irq(&mdev->req_lock);
+       /* Throttle, drbd_rs_begin_io and submit should become asynchronous
+        * wrt the receiver, but it is not as straightforward as it may seem.
+        * Various places in the resync start and stop logic assume resync
+        * requests are processed in order, requeuing this on the worker thread
+        * introduces a bunch of new code for synchronization between threads.
+        *
+        * Unlimited throttling before drbd_rs_begin_io may stall the resync
+        * "forever", throttling after drbd_rs_begin_io will lock that extent
+        * for application writes for the same time.  For now, just throttle
+        * here, where the rest of the code expects the receiver to sleep for
+        * a while, anyways.
+        */
 
+       /* Throttle before drbd_rs_begin_io, as that locks out application IO;
+        * this defers syncer requests for some time, before letting at least
+        * on request through.  The resync controller on the receiving side
+        * will adapt to the incoming rate accordingly.
+        *
+        * We cannot throttle here if remote is Primary/SyncTarget:
+        * we would also throttle its application reads.
+        * In that case, throttling is done on the SyncTarget only.
+        */
+       if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev))
+               msleep(100);
+       if (drbd_rs_begin_io(mdev, e->sector))
+               goto out_free_e;
+
+submit_for_resync:
+       atomic_add(size >> 9, &mdev->rs_sect_ev);
+
+submit:
        inc_unacked(mdev);
+       spin_lock_irq(&mdev->req_lock);
+       list_add_tail(&e->w.list, &mdev->read_ee);
+       spin_unlock_irq(&mdev->req_lock);
 
        if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
                return TRUE;
 
 out_free_e:
-       kfree(di);
        put_ldev(mdev);
        drbd_free_ee(mdev, e);
        return FALSE;
@@ -2699,20 +2739,13 @@ static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
        return 1;
 }
 
-static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
+static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_protocol *p = (struct p_protocol *)h;
-       int header_size, data_size;
+       struct p_protocol *p = &mdev->data.rbuf.protocol;
        int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
        int p_want_lose, p_two_primaries, cf;
        char p_integrity_alg[SHARED_SECRET_MAX] = "";
 
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
-
        p_proto         = be32_to_cpu(p->protocol);
        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
@@ -2805,39 +2838,46 @@ struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
        return tfm;
 }
 
-static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
+static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
 {
        int ok = TRUE;
-       struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
+       struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
        unsigned int header_size, data_size, exp_max_sz;
        struct crypto_hash *verify_tfm = NULL;
        struct crypto_hash *csums_tfm = NULL;
        const int apv = mdev->agreed_pro_version;
+       int *rs_plan_s = NULL;
+       int fifo_size = 0;
 
        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
                    : apv == 88 ? sizeof(struct p_rs_param)
                                        + SHARED_SECRET_MAX
-                   : /* 89 */    sizeof(struct p_rs_param_89);
+                   : apv <= 94 ? sizeof(struct p_rs_param_89)
+                   : /* apv >= 95 */ sizeof(struct p_rs_param_95);
 
-       if (h->length > exp_max_sz) {
+       if (packet_size > exp_max_sz) {
                dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
-                   h->length, exp_max_sz);
+                   packet_size, exp_max_sz);
                return FALSE;
        }
 
        if (apv <= 88) {
-               header_size = sizeof(struct p_rs_param) - sizeof(*h);
-               data_size   = h->length  - header_size;
-       } else /* apv >= 89 */ {
-               header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
-               data_size   = h->length  - header_size;
+               header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
+       } else if (apv <= 94) {
+               header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
+               D_ASSERT(data_size == 0);
+       } else {
+               header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
                D_ASSERT(data_size == 0);
        }
 
        /* initialize verify_alg and csums_alg */
        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
 
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
+       if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
                return FALSE;
 
        mdev->sync_conf.rate      = be32_to_cpu(p->rate);
@@ -2896,6 +2936,22 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
                        }
                }
 
+               if (apv > 94) {
+                       mdev->sync_conf.rate      = be32_to_cpu(p->rate);
+                       mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
+                       mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
+                       mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
+                       mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
+
+                       fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
+                       if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
+                               rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
+                               if (!rs_plan_s) {
+                                       dev_err(DEV, "kmalloc of fifo_buffer failed");
+                                       goto disconnect;
+                               }
+                       }
+               }
 
                spin_lock(&mdev->peer_seq_lock);
                /* lock against drbd_nl_syncer_conf() */
@@ -2913,6 +2969,12 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
                        mdev->csums_tfm = csums_tfm;
                        dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
                }
+               if (fifo_size != mdev->rs_plan_s.size) {
+                       kfree(mdev->rs_plan_s.values);
+                       mdev->rs_plan_s.values = rs_plan_s;
+                       mdev->rs_plan_s.size   = fifo_size;
+                       mdev->rs_planed = 0;
+               }
                spin_unlock(&mdev->peer_seq_lock);
        }
 
@@ -2946,19 +3008,15 @@ static void warn_if_differ_considerably(struct drbd_conf *mdev,
                     (unsigned long long)a, (unsigned long long)b);
 }
 
-static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
+static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_sizes *p = (struct p_sizes *)h;
+       struct p_sizes *p = &mdev->data.rbuf.sizes;
        enum determine_dev_size dd = unchanged;
        unsigned int max_seg_s;
        sector_t p_size, p_usize, my_usize;
        int ldsc = 0; /* local disk size changed */
        enum dds_flags ddsf;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        p_size = be64_to_cpu(p->d_size);
        p_usize = be64_to_cpu(p->u_size);
 
@@ -3062,16 +3120,12 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
+static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_uuids *p = (struct p_uuids *)h;
+       struct p_uuids *p = &mdev->data.rbuf.uuids;
        u64 *p_uuid;
        int i;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
 
        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
@@ -3107,6 +3161,11 @@ static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
                        drbd_md_sync(mdev);
                }
                put_ldev(mdev);
+       } else if (mdev->state.disk < D_INCONSISTENT &&
+                  mdev->state.role == R_PRIMARY) {
+               /* I am a diskless primary, the peer just created a new current UUID
+                  for me. */
+               drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
        }
 
        /* Before we test for the disk state, we should wait until an eventually
@@ -3150,16 +3209,12 @@ static union drbd_state convert_state(union drbd_state ps)
        return ms;
 }
 
-static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
+static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_req_state *p = (struct p_req_state *)h;
+       struct p_req_state *p = &mdev->data.rbuf.req_state;
        union drbd_state mask, val;
        int rv;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        mask.i = be32_to_cpu(p->mask);
        val.i = be32_to_cpu(p->val);
 
@@ -3180,20 +3235,15 @@ static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_state(struct drbd_conf *mdev, struct p_header *h)
+static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_state *p = (struct p_state *)h;
+       struct p_state *p = &mdev->data.rbuf.state;
        enum drbd_conns nconn, oconn;
        union drbd_state ns, peer_state;
        enum drbd_disk_state real_peer_disk;
+       enum chg_state_flags cs_flags;
        int rv;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
-               return FALSE;
-
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        peer_state.i = be32_to_cpu(p->state);
 
        real_peer_disk = peer_state.disk;
@@ -3263,8 +3313,20 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
        if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
                ns.disk = mdev->new_state_tmp.disk;
-
-       rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
+       cs_flags = CS_VERBOSE + (oconn < C_CONNECTED && nconn >= C_CONNECTED ? 0 : CS_HARD);
+       if (ns.pdsk == D_CONSISTENT && ns.susp && nconn == C_CONNECTED && oconn < C_CONNECTED &&
+           test_bit(NEW_CUR_UUID, &mdev->flags)) {
+               /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
+                  for temporal network outages! */
+               spin_unlock_irq(&mdev->req_lock);
+               dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
+               tl_clear(mdev);
+               drbd_uuid_new_current(mdev);
+               clear_bit(NEW_CUR_UUID, &mdev->flags);
+               drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
+               return FALSE;
+       }
+       rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
        ns = mdev->state;
        spin_unlock_irq(&mdev->req_lock);
 
@@ -3291,9 +3353,9 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
+static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_rs_uuid *p = (struct p_rs_uuid *)h;
+       struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
 
        wait_event(mdev->misc_wait,
                   mdev->state.conn == C_WF_SYNC_UUID ||
@@ -3302,10 +3364,6 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
 
        /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        /* Here the _drbd_uuid_ functions are right, current should
           _not_ be rotated into the history */
        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
@@ -3324,14 +3382,14 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
 enum receive_bitmap_ret { OK, DONE, FAILED };
 
 static enum receive_bitmap_ret
-receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
-       unsigned long *buffer, struct bm_xfer_ctx *c)
+receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
+                    unsigned long *buffer, struct bm_xfer_ctx *c)
 {
        unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
        unsigned want = num_words * sizeof(long);
 
-       if (want != h->length) {
-               dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
+       if (want != data_size) {
+               dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
                return FAILED;
        }
        if (want == 0)
@@ -3429,7 +3487,7 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev,
                const char *direction, struct bm_xfer_ctx *c)
 {
        /* what would it take to transfer it "plaintext" */
-       unsigned plain = sizeof(struct p_header) *
+       unsigned plain = sizeof(struct p_header80) *
                ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
                + c->bm_words * sizeof(long);
        unsigned total = c->bytes[0] + c->bytes[1];
@@ -3467,12 +3525,13 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev,
    in order to be agnostic to the 32 vs 64 bits issue.
 
    returns 0 on failure, 1 if we successfully received it. */
-static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
+static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        struct bm_xfer_ctx c;
        void *buffer;
        enum receive_bitmap_ret ret;
        int ok = FALSE;
+       struct p_header80 *h = &mdev->data.rbuf.header.h80;
 
        wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
 
@@ -3492,21 +3551,21 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
        };
 
        do {
-               if (h->command == P_BITMAP) {
-                       ret = receive_bitmap_plain(mdev, h, buffer, &c);
-               } else if (h->command == P_COMPRESSED_BITMAP) {
+               if (cmd == P_BITMAP) {
+                       ret = receive_bitmap_plain(mdev, data_size, buffer, &c);
+               } else if (cmd == P_COMPRESSED_BITMAP) {
                        /* MAYBE: sanity check that we speak proto >= 90,
                         * and the feature is enabled! */
                        struct p_compressed_bm *p;
 
-                       if (h->length > BM_PACKET_PAYLOAD_BYTES) {
+                       if (data_size > BM_PACKET_PAYLOAD_BYTES) {
                                dev_err(DEV, "ReportCBitmap packet too large\n");
                                goto out;
                        }
                        /* use the page buff */
                        p = buffer;
                        memcpy(p, h, sizeof(*h));
-                       if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
+                       if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
                                goto out;
                        if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
                                dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
@@ -3514,17 +3573,17 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
                        }
                        ret = decode_bitmap_c(mdev, p, &c);
                } else {
-                       dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
+                       dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
                        goto out;
                }
 
-               c.packets[h->command == P_BITMAP]++;
-               c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
+               c.packets[cmd == P_BITMAP]++;
+               c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
 
                if (ret != OK)
                        break;
 
-               if (!drbd_recv_header(mdev, h))
+               if (!drbd_recv_header(mdev, &cmd, &data_size))
                        goto out;
        } while (ret == OK);
        if (ret == FAILED)
@@ -3555,16 +3614,16 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
        return ok;
 }
 
-static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
+static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        /* TODO zero copy sink :) */
        static char sink[128];
        int size, want, r;
 
        dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
-            h->command, h->length);
+                cmd, data_size);
 
-       size = h->length;
+       size = data_size;
        while (size > 0) {
                want = min_t(int, size, sizeof(sink));
                r = drbd_recv(mdev, sink, want);
@@ -3574,7 +3633,7 @@ static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
        return size == 0;
 }
 
-static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
+static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        if (mdev->state.disk >= D_INCONSISTENT)
                drbd_kick_lo(mdev);
@@ -3586,194 +3645,91 @@ static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static void timeval_sub_us(struct timeval* tv, unsigned int us)
-{
-       tv->tv_sec -= us / 1000000;
-       us = us % 1000000;
-       if (tv->tv_usec > us) {
-               tv->tv_usec += 1000000;
-               tv->tv_sec--;
-       }
-       tv->tv_usec -= us;
-}
-
-static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
-{
-       struct delay_probe *dp;
-       struct list_head *le;
-       struct timeval now;
-       int seq_num;
-       int offset;
-       int data_delay;
-
-       seq_num = be32_to_cpu(p->seq_num);
-       offset  = be32_to_cpu(p->offset);
-
-       spin_lock(&mdev->peer_seq_lock);
-       if (!list_empty(&mdev->delay_probes)) {
-               if (from == USE_DATA_SOCKET)
-                       le = mdev->delay_probes.next;
-               else
-                       le = mdev->delay_probes.prev;
-
-               dp = list_entry(le, struct delay_probe, list);
-
-               if (dp->seq_num == seq_num) {
-                       list_del(le);
-                       spin_unlock(&mdev->peer_seq_lock);
-                       do_gettimeofday(&now);
-                       timeval_sub_us(&now, offset);
-                       data_delay =
-                               now.tv_usec - dp->time.tv_usec +
-                               (now.tv_sec - dp->time.tv_sec) * 1000000;
-
-                       if (data_delay > 0)
-                               mdev->data_delay = data_delay;
-
-                       kfree(dp);
-                       return;
-               }
-
-               if (dp->seq_num > seq_num) {
-                       spin_unlock(&mdev->peer_seq_lock);
-                       dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
-                       return; /* Do not alloca a struct delay_probe.... */
-               }
-       }
-       spin_unlock(&mdev->peer_seq_lock);
-
-       dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
-       if (!dp) {
-               dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
-               return;
-       }
-
-       dp->seq_num = seq_num;
-       do_gettimeofday(&dp->time);
-       timeval_sub_us(&dp->time, offset);
-
-       spin_lock(&mdev->peer_seq_lock);
-       if (from == USE_DATA_SOCKET)
-               list_add(&dp->list, &mdev->delay_probes);
-       else
-               list_add_tail(&dp->list, &mdev->delay_probes);
-       spin_unlock(&mdev->peer_seq_lock);
-}
-
-static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
-{
-       struct p_delay_probe *p = (struct p_delay_probe *)h;
-
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
+typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
 
-       got_delay_probe(mdev, USE_DATA_SOCKET, p);
-       return TRUE;
-}
+struct data_cmd {
+       int expect_payload;
+       size_t pkt_size;
+       drbd_cmd_handler_f function;
+};
 
-typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
-
-static drbd_cmd_handler_f drbd_default_handler[] = {
-       [P_DATA]            = receive_Data,
-       [P_DATA_REPLY]      = receive_DataReply,
-       [P_RS_DATA_REPLY]   = receive_RSDataReply,
-       [P_BARRIER]         = receive_Barrier,
-       [P_BITMAP]          = receive_bitmap,
-       [P_COMPRESSED_BITMAP]    = receive_bitmap,
-       [P_UNPLUG_REMOTE]   = receive_UnplugRemote,
-       [P_DATA_REQUEST]    = receive_DataRequest,
-       [P_RS_DATA_REQUEST] = receive_DataRequest,
-       [P_SYNC_PARAM]      = receive_SyncParam,
-       [P_SYNC_PARAM89]           = receive_SyncParam,
-       [P_PROTOCOL]        = receive_protocol,
-       [P_UUIDS]           = receive_uuids,
-       [P_SIZES]           = receive_sizes,
-       [P_STATE]           = receive_state,
-       [P_STATE_CHG_REQ]   = receive_req_state,
-       [P_SYNC_UUID]       = receive_sync_uuid,
-       [P_OV_REQUEST]      = receive_DataRequest,
-       [P_OV_REPLY]        = receive_DataRequest,
-       [P_CSUM_RS_REQUEST]    = receive_DataRequest,
-       [P_DELAY_PROBE]     = receive_delay_probe,
+static struct data_cmd drbd_cmd_handler[] = {
+       [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
+       [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
+       [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
+       [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
+       [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
+       [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
+       [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
+       [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
+       [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
+       [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
+       [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
+       [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
+       [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
+       [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
+       [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
+       [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
+       [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
+       [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
        /* anything missing from this table is in
         * the asender_tbl, see get_asender_cmd */
-       [P_MAX_CMD]         = NULL,
+       [P_MAX_CMD]         = { 0, 0, NULL },
 };
 
-static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
-static drbd_cmd_handler_f *drbd_opt_cmd_handler;
+/* All handler functions that expect a sub-header get that sub-heder in
+   mdev->data.rbuf.header.head.payload.
+
+   Usually in mdev->data.rbuf.header.head the callback can find the usual
+   p_header, but they may not rely on that. Since there is also p_header95 !
+ */
 
 static void drbdd(struct drbd_conf *mdev)
 {
-       drbd_cmd_handler_f handler;
-       struct p_header *header = &mdev->data.rbuf.header;
+       union p_header *header = &mdev->data.rbuf.header;
+       unsigned int packet_size;
+       enum drbd_packets cmd;
+       size_t shs; /* sub header size */
+       int rv;
 
        while (get_t_state(&mdev->receiver) == Running) {
                drbd_thread_current_set_cpu(mdev);
-               if (!drbd_recv_header(mdev, header)) {
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
-               }
+               if (!drbd_recv_header(mdev, &cmd, &packet_size))
+                       goto err_out;
 
-               if (header->command < P_MAX_CMD)
-                       handler = drbd_cmd_handler[header->command];
-               else if (P_MAY_IGNORE < header->command
-                    && header->command < P_MAX_OPT_CMD)
-                       handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
-               else if (header->command > P_MAX_OPT_CMD)
-                       handler = receive_skip;
-               else
-                       handler = NULL;
+               if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
+                       dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
+                       goto err_out;
+               }
 
-               if (unlikely(!handler)) {
-                       dev_err(DEV, "unknown packet type %d, l: %d!\n",
-                           header->command, header->length);
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
+               shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
+               rv = drbd_recv(mdev, &header->h80.payload, shs);
+               if (unlikely(rv != shs)) {
+                       dev_err(DEV, "short read while reading sub header: rv=%d\n", rv);
+                       goto err_out;
                }
-               if (unlikely(!handler(mdev, header))) {
-                       dev_err(DEV, "error receiving %s, l: %d!\n",
-                           cmdname(header->command), header->length);
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
+
+               if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
+                       dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
+                       goto err_out;
                }
-       }
-}
 
-static void drbd_fail_pending_reads(struct drbd_conf *mdev)
-{
-       struct hlist_head *slot;
-       struct hlist_node *pos;
-       struct hlist_node *tmp;
-       struct drbd_request *req;
-       int i;
+               rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
 
-       /*
-        * Application READ requests
-        */
-       spin_lock_irq(&mdev->req_lock);
-       for (i = 0; i < APP_R_HSIZE; i++) {
-               slot = mdev->app_reads_hash+i;
-               hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
-                       /* it may (but should not any longer!)
-                        * be on the work queue; if that assert triggers,
-                        * we need to also grab the
-                        * spin_lock_irq(&mdev->data.work.q_lock);
-                        * and list_del_init here. */
-                       D_ASSERT(list_empty(&req->w.list));
-                       /* It would be nice to complete outside of spinlock.
-                        * But this is easier for now. */
-                       _req_mod(req, connection_lost_while_pending);
+               if (unlikely(!rv)) {
+                       dev_err(DEV, "error receiving %s, l: %d!\n",
+                           cmdname(cmd), packet_size);
+                       goto err_out;
                }
        }
-       for (i = 0; i < APP_R_HSIZE; i++)
-               if (!hlist_empty(mdev->app_reads_hash+i))
-                       dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
-                               "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
 
-       memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
-       spin_unlock_irq(&mdev->req_lock);
+       if (0) {
+       err_out:
+               drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
+       }
 }
 
 void drbd_flush_workqueue(struct drbd_conf *mdev)
@@ -3786,6 +3742,36 @@ void drbd_flush_workqueue(struct drbd_conf *mdev)
        wait_for_completion(&barr.done);
 }
 
+void drbd_free_tl_hash(struct drbd_conf *mdev)
+{
+       struct hlist_head *h;
+
+       spin_lock_irq(&mdev->req_lock);
+
+       if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
+               spin_unlock_irq(&mdev->req_lock);
+               return;
+       }
+       /* paranoia code */
+       for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
+               if (h->first)
+                       dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
+                               (int)(h - mdev->ee_hash), h->first);
+       kfree(mdev->ee_hash);
+       mdev->ee_hash = NULL;
+       mdev->ee_hash_s = 0;
+
+       /* paranoia code */
+       for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
+               if (h->first)
+                       dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
+                               (int)(h - mdev->tl_hash), h->first);
+       kfree(mdev->tl_hash);
+       mdev->tl_hash = NULL;
+       mdev->tl_hash_s = 0;
+       spin_unlock_irq(&mdev->req_lock);
+}
+
 static void drbd_disconnect(struct drbd_conf *mdev)
 {
        enum drbd_fencing_p fp;
@@ -3803,6 +3789,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        drbd_thread_stop(&mdev->asender);
        drbd_free_sock(mdev);
 
+       /* wait for current activity to cease. */
        spin_lock_irq(&mdev->req_lock);
        _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
        _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
@@ -3827,7 +3814,6 @@ static void drbd_disconnect(struct drbd_conf *mdev)
 
        /* make sure syncer is stopped and w_resume_next_sg queued */
        del_timer_sync(&mdev->resync_timer);
-       set_bit(STOP_SYNC_TIMER, &mdev->flags);
        resync_timer_fn((unsigned long)mdev);
 
        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
@@ -3845,8 +3831,6 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        if (!mdev->state.susp)
                tl_clear(mdev);
 
-       drbd_fail_pending_reads(mdev);
-
        dev_info(DEV, "Connection closed\n");
 
        drbd_md_sync(mdev);
@@ -3857,12 +3841,8 @@ static void drbd_disconnect(struct drbd_conf *mdev)
                put_ldev(mdev);
        }
 
-       if (mdev->state.role == R_PRIMARY) {
-               if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
-                       enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
-                       drbd_request_state(mdev, NS(pdsk, nps));
-               }
-       }
+       if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
+               drbd_try_outdate_peer_async(mdev);
 
        spin_lock_irq(&mdev->req_lock);
        os = mdev->state;
@@ -3875,32 +3855,14 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        if (os.conn == C_DISCONNECTING) {
-               struct hlist_head *h;
-               wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
-
-               /* we must not free the tl_hash
-                * while application io is still on the fly */
-               wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
+               wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
 
-               spin_lock_irq(&mdev->req_lock);
-               /* paranoia code */
-               for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
-                       if (h->first)
-                               dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
-                                               (int)(h - mdev->ee_hash), h->first);
-               kfree(mdev->ee_hash);
-               mdev->ee_hash = NULL;
-               mdev->ee_hash_s = 0;
-
-               /* paranoia code */
-               for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
-                       if (h->first)
-                               dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
-                                               (int)(h - mdev->tl_hash), h->first);
-               kfree(mdev->tl_hash);
-               mdev->tl_hash = NULL;
-               mdev->tl_hash_s = 0;
-               spin_unlock_irq(&mdev->req_lock);
+               if (!mdev->state.susp) {
+                       /* we must not free the tl_hash
+                        * while application io is still on the fly */
+                       wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
+                       drbd_free_tl_hash(mdev);
+               }
 
                crypto_free_hash(mdev->cram_hmac_tfm);
                mdev->cram_hmac_tfm = NULL;
@@ -3920,6 +3882,9 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        i = drbd_release_ee(mdev, &mdev->net_ee);
        if (i)
                dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
+       i = atomic_read(&mdev->pp_in_use_by_net);
+       if (i)
+               dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
        i = atomic_read(&mdev->pp_in_use);
        if (i)
                dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
@@ -3963,7 +3928,7 @@ static int drbd_send_handshake(struct drbd_conf *mdev)
        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
        ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
-                            (struct p_header *)p, sizeof(*p), 0 );
+                            (struct p_header80 *)p, sizeof(*p), 0 );
        mutex_unlock(&mdev->data.mutex);
        return ok;
 }
@@ -3979,27 +3944,28 @@ static int drbd_do_handshake(struct drbd_conf *mdev)
 {
        /* ASSERT current == mdev->receiver ... */
        struct p_handshake *p = &mdev->data.rbuf.handshake;
-       const int expect = sizeof(struct p_handshake)
-                         -sizeof(struct p_header);
+       const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
+       unsigned int length;
+       enum drbd_packets cmd;
        int rv;
 
        rv = drbd_send_handshake(mdev);
        if (!rv)
                return 0;
 
-       rv = drbd_recv_header(mdev, &p->head);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                return 0;
 
-       if (p->head.command != P_HAND_SHAKE) {
+       if (cmd != P_HAND_SHAKE) {
                dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
-                    cmdname(p->head.command), p->head.command);
+                    cmdname(cmd), cmd);
                return -1;
        }
 
-       if (p->head.length != expect) {
+       if (length != expect) {
                dev_err(DEV, "expected HandShake length: %u, received: %u\n",
-                    expect, p->head.length);
+                    expect, length);
                return -1;
        }
 
@@ -4057,10 +4023,11 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        char *response = NULL;
        char *right_response = NULL;
        char *peers_ch = NULL;
-       struct p_header p;
        unsigned int key_len = strlen(mdev->net_conf->shared_secret);
        unsigned int resp_size;
        struct hash_desc desc;
+       enum drbd_packets cmd;
+       unsigned int length;
        int rv;
 
        desc.tfm = mdev->cram_hmac_tfm;
@@ -4080,33 +4047,33 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        if (!rv)
                goto fail;
 
-       rv = drbd_recv_header(mdev, &p);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                goto fail;
 
-       if (p.command != P_AUTH_CHALLENGE) {
+       if (cmd != P_AUTH_CHALLENGE) {
                dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
-                   cmdname(p.command), p.command);
+                   cmdname(cmd), cmd);
                rv = 0;
                goto fail;
        }
 
-       if (p.length > CHALLENGE_LEN*2) {
+       if (length > CHALLENGE_LEN * 2) {
                dev_err(DEV, "expected AuthChallenge payload too big.\n");
                rv = -1;
                goto fail;
        }
 
-       peers_ch = kmalloc(p.length, GFP_NOIO);
+       peers_ch = kmalloc(length, GFP_NOIO);
        if (peers_ch == NULL) {
                dev_err(DEV, "kmalloc of peers_ch failed\n");
                rv = -1;
                goto fail;
        }
 
-       rv = drbd_recv(mdev, peers_ch, p.length);
+       rv = drbd_recv(mdev, peers_ch, length);
 
-       if (rv != p.length) {
+       if (rv != length) {
                dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
                rv = 0;
                goto fail;
@@ -4121,7 +4088,7 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        }
 
        sg_init_table(&sg, 1);
-       sg_set_buf(&sg, peers_ch, p.length);
+       sg_set_buf(&sg, peers_ch, length);
 
        rv = crypto_hash_digest(&desc, &sg, sg.length, response);
        if (rv) {
@@ -4134,18 +4101,18 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        if (!rv)
                goto fail;
 
-       rv = drbd_recv_header(mdev, &p);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                goto fail;
 
-       if (p.command != P_AUTH_RESPONSE) {
+       if (cmd != P_AUTH_RESPONSE) {
                dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
-                   cmdname(p.command), p.command);
+                       cmdname(cmd), cmd);
                rv = 0;
                goto fail;
        }
 
-       if (p.length != resp_size) {
+       if (length != resp_size) {
                dev_err(DEV, "expected AuthResponse payload of wrong size\n");
                rv = 0;
                goto fail;
@@ -4230,7 +4197,7 @@ int drbdd_init(struct drbd_thread *thi)
 
 /* ********* acknowledge sender ******** */
 
-static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_req_state_reply *p = (struct p_req_state_reply *)h;
 
@@ -4248,13 +4215,13 @@ static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
+static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
 {
        return drbd_send_ping_ack(mdev);
 
 }
 
-static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        /* restore idle timeout */
        mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
@@ -4264,7 +4231,7 @@ static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
+static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4279,6 +4246,7 @@ static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
        /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
        mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
        dec_rs_pending(mdev);
+       atomic_add(blksize >> 9, &mdev->rs_sect_in);
 
        return TRUE;
 }
@@ -4334,7 +4302,7 @@ static int validate_req_change_req_state(struct drbd_conf *mdev,
        return TRUE;
 }
 
-static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4374,7 +4342,7 @@ static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
                _ack_id_to_req, __func__ , what);
 }
 
-static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4394,7 +4362,7 @@ static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
                _ack_id_to_req, __func__ , neg_acked);
 }
 
-static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4407,7 +4375,7 @@ static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
                _ar_id_to_req, __func__ , neg_acked);
 }
 
-static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        sector_t sector;
        int size;
@@ -4429,7 +4397,7 @@ static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_barrier_ack *p = (struct p_barrier_ack *)h;
 
@@ -4438,7 +4406,7 @@ static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
+static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        struct drbd_work *w;
@@ -4472,17 +4440,14 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
+static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
 {
-       struct p_delay_probe *p = (struct p_delay_probe *)h;
-
-       got_delay_probe(mdev, USE_META_SOCKET, p);
        return TRUE;
 }
 
 struct asender_cmd {
        size_t pkt_size;
-       int (*process)(struct drbd_conf *mdev, struct p_header *h);
+       int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
 };
 
 static struct asender_cmd *get_asender_cmd(int cmd)
@@ -4491,8 +4456,8 @@ static struct asender_cmd *get_asender_cmd(int cmd)
                /* anything missing from this table is in
                 * the drbd_cmd_handler (drbd_default_handler) table,
                 * see the beginning of drbdd() */
-       [P_PING]            = { sizeof(struct p_header), got_Ping },
-       [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
+       [P_PING]            = { sizeof(struct p_header80), got_Ping },
+       [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
@@ -4504,7 +4469,7 @@ static struct asender_cmd *get_asender_cmd(int cmd)
        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
-       [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_delay_probe_m },
+       [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
        [P_MAX_CMD]         = { 0, NULL },
        };
        if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
@@ -4515,13 +4480,13 @@ static struct asender_cmd *get_asender_cmd(int cmd)
 int drbd_asender(struct drbd_thread *thi)
 {
        struct drbd_conf *mdev = thi->mdev;
-       struct p_header *h = &mdev->meta.rbuf.header;
+       struct p_header80 *h = &mdev->meta.rbuf.header.h80;
        struct asender_cmd *cmd = NULL;
 
        int rv, len;
        void *buf    = h;
        int received = 0;
-       int expect   = sizeof(struct p_header);
+       int expect   = sizeof(struct p_header80);
        int empty;
 
        sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
@@ -4621,7 +4586,7 @@ int drbd_asender(struct drbd_thread *thi)
                                goto disconnect;
                        }
                        expect = cmd->pkt_size;
-                       ERR_IF(len != expect-sizeof(struct p_header))
+                       ERR_IF(len != expect-sizeof(struct p_header80))
                                goto reconnect;
                }
                if (received == expect) {
@@ -4631,7 +4596,7 @@ int drbd_asender(struct drbd_thread *thi)
 
                        buf      = h;
                        received = 0;
-                       expect   = sizeof(struct p_header);
+                       expect   = sizeof(struct p_header80);
                        cmd      = NULL;
                }
        }
This page took 0.050149 seconds and 5 git commands to generate.