Merge tag 'ceph-for-4.8-rc1' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 2 Aug 2016 23:39:09 +0000 (19:39 -0400)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 2 Aug 2016 23:39:09 +0000 (19:39 -0400)
Pull Ceph updates from Ilya Dryomov:
 "The highlights are:

   - RADOS namespace support in libceph and CephFS (Zheng Yan and
     myself).  The stopgaps added in 4.5 to deny access to inodes in
     namespaces are removed and CEPH_FEATURE_FS_FILE_LAYOUT_V2 feature
     bit is now fully supported

   - A large rework of the MDS cap flushing code (Zheng Yan)

   - Handle some of ->d_revalidate() in RCU mode (Jeff Layton).  We were
     overly pessimistic before, bailing at the first sight of LOOKUP_RCU

  On top of that we've got a few CephFS bug fixes, a couple of cleanups
  and Arnd's workaround for a weird genksyms issue"

* tag 'ceph-for-4.8-rc1' of git://github.com/ceph/ceph-client: (34 commits)
  ceph: fix symbol versioning for ceph_monc_do_statfs
  ceph: Correctly return NXIO errors from ceph_llseek
  ceph: Mark the file cache as unreclaimable
  ceph: optimize cap flush waiting
  ceph: cleanup ceph_flush_snaps()
  ceph: kick cap flushes before sending other cap message
  ceph: introduce an inode flag to indicates if snapflush is needed
  ceph: avoid sending duplicated cap flush message
  ceph: unify cap flush and snapcap flush
  ceph: use list instead of rbtree to track cap flushes
  ceph: update types of some local varibles
  ceph: include 'follows' of pending snapflush in cap reconnect message
  ceph: update cap reconnect message to version 3
  ceph: mount non-default filesystem by name
  libceph: fsmap.user subscription support
  ceph: handle LOOKUP_RCU in ceph_d_revalidate
  ceph: allow dentry_lease_is_valid to work under RCU walk
  ceph: clear d_fsinfo pointer under d_lock
  ceph: remove ceph_mdsc_lease_release
  ceph: don't use ->d_time
  ...

1  2 
drivers/block/rbd.c
fs/ceph/inode.c
fs/ceph/mds_client.c

diff --combined drivers/block/rbd.c
index 450662055d97338996720d4aa93efa0aefc16e65,58fd02d4e534b6bbbf7654c1280a7ed75cc7b4b0..1a04af6d24212cdd16e5c8091d01f16e2b409384
@@@ -1937,7 -1937,7 +1937,7 @@@ static struct ceph_osd_request *rbd_osd
        osd_req->r_callback = rbd_osd_req_callback;
        osd_req->r_priv = obj_request;
  
-       osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
+       osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
        if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
                             obj_request->object_name))
                goto fail;
@@@ -1991,7 -1991,7 +1991,7 @@@ rbd_osd_req_create_copyup(struct rbd_ob
        osd_req->r_callback = rbd_osd_req_callback;
        osd_req->r_priv = obj_request;
  
-       osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
+       osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
        if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
                             obj_request->object_name))
                goto fail;
@@@ -3286,9 -3286,9 +3286,9 @@@ static void rbd_queue_workfn(struct wor
                goto err;
        }
  
 -      if (rq->cmd_flags & REQ_DISCARD)
 +      if (req_op(rq) == REQ_OP_DISCARD)
                op_type = OBJ_OP_DISCARD;
 -      else if (rq->cmd_flags & REQ_WRITE)
 +      else if (req_op(rq) == REQ_OP_WRITE)
                op_type = OBJ_OP_WRITE;
        else
                op_type = OBJ_OP_READ;
@@@ -3995,10 -3995,11 +3995,11 @@@ static struct rbd_device *rbd_dev_creat
  
        /* Initialize the layout used for all rbd requests */
  
-       rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
-       rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
+       rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
+       rbd_dev->layout.stripe_count = 1;
+       rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
+       rbd_dev->layout.pool_id = spec->pool_id;
+       RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
  
        /*
         * If this is a mapping rbd_dev (as opposed to a parent one),
@@@ -5187,7 -5188,7 +5188,7 @@@ static int rbd_dev_header_name(struct r
  
        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
  
-       rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
+       rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
        if (rbd_dev->image_format == 1)
                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
                                       spec->image_name, RBD_SUFFIX);
diff --combined fs/ceph/inode.c
index 99bdef66213a662a64dc8b14823c72e3d361a0ca,fd85b3c589609fcbf4fc73dc717629f4b38f4ab2..dd3a6dbf71ebcd8903ca6a04b2ba91fa24893baa
@@@ -446,7 -446,7 +446,7 @@@ struct inode *ceph_alloc_inode(struct s
        ci->i_symlink = NULL;
  
        memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
-       ci->i_pool_ns_len = 0;
+       RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
  
        ci->i_fragtree = RB_ROOT;
        mutex_init(&ci->i_fragtree_mutex);
        INIT_LIST_HEAD(&ci->i_dirty_item);
        INIT_LIST_HEAD(&ci->i_flushing_item);
        ci->i_prealloc_cap_flush = NULL;
-       ci->i_cap_flush_tree = RB_ROOT;
+       INIT_LIST_HEAD(&ci->i_cap_flush_list);
        init_waitqueue_head(&ci->i_cap_wq);
        ci->i_hold_caps_min = 0;
        ci->i_hold_caps_max = 0;
        ci->i_head_snapc = NULL;
        ci->i_snap_caps = 0;
  
-       for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
+       for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
                ci->i_nr_by_mode[i] = 0;
  
        mutex_init(&ci->i_truncate_mutex);
@@@ -570,6 -570,8 +570,8 @@@ void ceph_destroy_inode(struct inode *i
        if (ci->i_xattrs.prealloc_blob)
                ceph_buffer_put(ci->i_xattrs.prealloc_blob);
  
+       ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
        call_rcu(&inode->i_rcu, ceph_i_callback);
  }
  
@@@ -583,6 -585,14 +585,14 @@@ int ceph_drop_inode(struct inode *inode
        return 1;
  }
  
+ void ceph_evict_inode(struct inode *inode)
+ {
+       /* wait unsafe sync writes */
+       ceph_sync_write_wait(inode);
+       truncate_inode_pages_final(&inode->i_data);
+       clear_inode(inode);
+ }
  static inline blkcnt_t calc_inode_blocks(u64 size)
  {
        return (size + (1<<9) - 1) >> 9;
@@@ -733,6 -743,7 +743,7 @@@ static int fill_inode(struct inode *ino
        int issued = 0, implemented, new_issued;
        struct timespec mtime, atime, ctime;
        struct ceph_buffer *xattr_blob = NULL;
+       struct ceph_string *pool_ns = NULL;
        struct ceph_cap *new_cap = NULL;
        int err = 0;
        bool wake = false;
                               iinfo->xattr_len);
        }
  
+       if (iinfo->pool_ns_len > 0)
+               pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
+                                                    iinfo->pool_ns_len);
        spin_lock(&ci->i_ceph_lock);
  
        /*
  
        if (new_version ||
            (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
-               if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool)
+               s64 old_pool = ci->i_layout.pool_id;
+               struct ceph_string *old_ns;
+               ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
+               old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+                                       lockdep_is_held(&ci->i_ceph_lock));
+               rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);
+               if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
                        ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
-               ci->i_layout = info->layout;
-               ci->i_pool_ns_len = iinfo->pool_ns_len;
+               pool_ns = old_ns;
  
                queue_trunc = ceph_fill_file_size(inode, issued,
                                        le32_to_cpu(info->truncate_seq),
@@@ -985,6 -1008,7 +1008,7 @@@ out
                ceph_put_cap(mdsc, new_cap);
        if (xattr_blob)
                ceph_buffer_put(xattr_blob);
+       ceph_put_string(pool_ns);
        return err;
  }
  
@@@ -1018,7 -1042,7 +1042,7 @@@ static void update_dentry_lease(struct 
                goto out_unlock;
  
        if (di->lease_gen == session->s_cap_gen &&
-           time_before(ttl, dentry->d_time))
+           time_before(ttl, di->time))
                goto out_unlock;  /* we already have a newer lease. */
  
        if (di->lease_session && di->lease_session != session)
        di->lease_seq = le32_to_cpu(lease->seq);
        di->lease_renew_after = half_ttl;
        di->lease_renew_from = 0;
-       dentry->d_time = ttl;
+       di->time = ttl;
  out_unlock:
        spin_unlock(&dentry->d_lock);
        return;
@@@ -1164,7 -1188,7 +1188,7 @@@ int ceph_fill_trace(struct super_block 
  
                        dname.name = rinfo->dname;
                        dname.len = rinfo->dname_len;
 -                      dname.hash = full_name_hash(dname.name, dname.len);
 +                      dname.hash = full_name_hash(parent, dname.name, dname.len);
                        vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
                        vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
  retry_lookup:
@@@ -1508,7 -1532,7 +1532,7 @@@ int ceph_readdir_prepopulate(struct cep
  
                dname.name = rde->name;
                dname.len = rde->name_len;
 -              dname.hash = full_name_hash(dname.name, dname.len);
 +              dname.hash = full_name_hash(parent, dname.name, dname.len);
  
                vino.ino = le64_to_cpu(rde->inode.in->ino);
                vino.snap = le64_to_cpu(rde->inode.in->snapid);
diff --combined fs/ceph/mds_client.c
index 4e8678a612b6ffc4465ff90fc8bc12e9f93f465d,cdc6a17f58670222a1adbae58d3a36a57f04ef80..fa59a85226b262f2fe086ec5dfc1bf6813711986
@@@ -48,7 -48,7 +48,7 @@@
  struct ceph_reconnect_state {
        int nr_caps;
        struct ceph_pagelist *pagelist;
-       bool flock;
+       unsigned msg_version;
  };
  
  static void __wake_requests(struct ceph_mds_client *mdsc,
@@@ -100,12 -100,15 +100,15 @@@ static int parse_reply_info_in(void **p
        } else
                info->inline_version = CEPH_INLINE_NONE;
  
+       info->pool_ns_len = 0;
+       info->pool_ns_data = NULL;
        if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
                ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
-               ceph_decode_need(p, end, info->pool_ns_len, bad);
-               *p += info->pool_ns_len;
-       } else {
-               info->pool_ns_len = 0;
+               if (info->pool_ns_len > 0) {
+                       ceph_decode_need(p, end, info->pool_ns_len, bad);
+                       info->pool_ns_data = *p;
+                       *p += info->pool_ns_len;
+               }
        }
  
        return 0;
@@@ -469,7 -472,6 +472,6 @@@ static struct ceph_mds_session *registe
        s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_LIST_HEAD(&s->s_cap_flushing);
-       INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
  
        dout("register_session mds%d\n", mds);
        if (mds >= mdsc->max_sessions) {
@@@ -1145,19 -1147,17 +1147,17 @@@ static int remove_session_caps_cb(struc
                    ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
                        invalidate = true;
  
-               while (true) {
-                       struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
-                       if (!n)
-                               break;
-                       cf = rb_entry(n, struct ceph_cap_flush, i_node);
-                       rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
-                       list_add(&cf->list, &to_remove);
+               while (!list_empty(&ci->i_cap_flush_list)) {
+                       cf = list_first_entry(&ci->i_cap_flush_list,
+                                             struct ceph_cap_flush, i_list);
+                       list_del(&cf->i_list);
+                       list_add(&cf->i_list, &to_remove);
                }
  
                spin_lock(&mdsc->cap_dirty_lock);
  
-               list_for_each_entry(cf, &to_remove, list)
-                       rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+               list_for_each_entry(cf, &to_remove, i_list)
+                       list_del(&cf->g_list);
  
                if (!list_empty(&ci->i_dirty_item)) {
                        pr_warn_ratelimited(
                spin_unlock(&mdsc->cap_dirty_lock);
  
                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
-                       list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
+                       list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
                        ci->i_prealloc_cap_flush = NULL;
                }
        }
        while (!list_empty(&to_remove)) {
                struct ceph_cap_flush *cf;
                cf = list_first_entry(&to_remove,
-                                     struct ceph_cap_flush, list);
-               list_del(&cf->list);
+                                     struct ceph_cap_flush, i_list);
+               list_del(&cf->i_list);
                ceph_free_cap_flush(cf);
        }
  
@@@ -1212,6 -1212,8 +1212,8 @@@ static void remove_session_caps(struct 
        dout("remove_session_caps on %p\n", session);
        iterate_session_caps(session, remove_session_caps_cb, fsc);
  
+       wake_up_all(&fsc->mdsc->cap_flushing_wq);
        spin_lock(&session->s_cap_lock);
        if (session->s_nr_caps > 0) {
                struct inode *inode;
@@@ -1478,35 -1480,21 +1480,21 @@@ static int trim_caps(struct ceph_mds_cl
        return 0;
  }
  
- static int check_capsnap_flush(struct ceph_inode_info *ci,
-                              u64 want_snap_seq)
- {
-       int ret = 1;
-       spin_lock(&ci->i_ceph_lock);
-       if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
-               struct ceph_cap_snap *capsnap =
-                       list_first_entry(&ci->i_cap_snaps,
-                                        struct ceph_cap_snap, ci_item);
-               ret = capsnap->follows >= want_snap_seq;
-       }
-       spin_unlock(&ci->i_ceph_lock);
-       return ret;
- }
  static int check_caps_flush(struct ceph_mds_client *mdsc,
                            u64 want_flush_tid)
  {
-       struct rb_node *n;
-       struct ceph_cap_flush *cf;
        int ret = 1;
  
        spin_lock(&mdsc->cap_dirty_lock);
-       n = rb_first(&mdsc->cap_flush_tree);
-       cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
-       if (cf && cf->tid <= want_flush_tid) {
-               dout("check_caps_flush still flushing tid %llu <= %llu\n",
-                    cf->tid, want_flush_tid);
-               ret = 0;
+       if (!list_empty(&mdsc->cap_flush_list)) {
+               struct ceph_cap_flush *cf =
+                       list_first_entry(&mdsc->cap_flush_list,
+                                        struct ceph_cap_flush, g_list);
+               if (cf->tid <= want_flush_tid) {
+                       dout("check_caps_flush still flushing tid "
+                            "%llu <= %llu\n", cf->tid, want_flush_tid);
+                       ret = 0;
+               }
        }
        spin_unlock(&mdsc->cap_dirty_lock);
        return ret;
   * returns true if we've flushed through want_flush_tid
   */
  static void wait_caps_flush(struct ceph_mds_client *mdsc,
-                           u64 want_flush_tid, u64 want_snap_seq)
+                           u64 want_flush_tid)
  {
-       int mds;
-       dout("check_caps_flush want %llu snap want %llu\n",
-            want_flush_tid, want_snap_seq);
-       mutex_lock(&mdsc->mutex);
-       for (mds = 0; mds < mdsc->max_sessions; ) {
-               struct ceph_mds_session *session = mdsc->sessions[mds];
-               struct inode *inode = NULL;
-               if (!session) {
-                       mds++;
-                       continue;
-               }
-               get_session(session);
-               mutex_unlock(&mdsc->mutex);
-               mutex_lock(&session->s_mutex);
-               if (!list_empty(&session->s_cap_snaps_flushing)) {
-                       struct ceph_cap_snap *capsnap =
-                               list_first_entry(&session->s_cap_snaps_flushing,
-                                                struct ceph_cap_snap,
-                                                flushing_item);
-                       struct ceph_inode_info *ci = capsnap->ci;
-                       if (!check_capsnap_flush(ci, want_snap_seq)) {
-                               dout("check_cap_flush still flushing snap %p "
-                                    "follows %lld <= %lld to mds%d\n",
-                                    &ci->vfs_inode, capsnap->follows,
-                                    want_snap_seq, mds);
-                               inode = igrab(&ci->vfs_inode);
-                       }
-               }
-               mutex_unlock(&session->s_mutex);
-               ceph_put_mds_session(session);
-               if (inode) {
-                       wait_event(mdsc->cap_flushing_wq,
-                                  check_capsnap_flush(ceph_inode(inode),
-                                                      want_snap_seq));
-                       iput(inode);
-               } else {
-                       mds++;
-               }
-               mutex_lock(&mdsc->mutex);
-       }
-       mutex_unlock(&mdsc->mutex);
+       dout("check_caps_flush want %llu\n", want_flush_tid);
  
        wait_event(mdsc->cap_flushing_wq,
                   check_caps_flush(mdsc, want_flush_tid));
@@@ -2163,6 -2106,11 +2106,11 @@@ static int __do_request(struct ceph_mds
        mds = __choose_mds(mdsc, req);
        if (mds < 0 ||
            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+               if (mdsc->mdsmap_err) {
+                       err = mdsc->mdsmap_err;
+                       dout("do_request mdsmap err %d\n", err);
+                       goto finish;
+               }
                dout("do_request no mds or not active, waiting for map\n");
                list_add(&req->r_wait, &mdsc->waiting_for_map);
                goto out;
@@@ -2292,14 -2240,6 +2240,6 @@@ int ceph_mdsc_do_request(struct ceph_md
                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
                                  CEPH_CAP_PIN);
  
-       /* deny access to directories with pool_ns layouts */
-       if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
-           ceph_inode(req->r_inode)->i_pool_ns_len)
-               return -EIO;
-       if (req->r_locked_dir &&
-           ceph_inode(req->r_locked_dir)->i_pool_ns_len)
-               return -EIO;
        /* issue */
        mutex_lock(&mdsc->mutex);
        __register_request(mdsc, req, dir);
@@@ -2791,13 -2731,13 +2731,13 @@@ static int encode_caps_cb(struct inode 
                struct ceph_mds_cap_reconnect v2;
                struct ceph_mds_cap_reconnect_v1 v1;
        } rec;
-       size_t reclen;
        struct ceph_inode_info *ci;
        struct ceph_reconnect_state *recon_state = arg;
        struct ceph_pagelist *pagelist = recon_state->pagelist;
        char *path;
        int pathlen, err;
        u64 pathbase;
+       u64 snap_follows;
        struct dentry *dentry;
  
        ci = cap->ci;
                path = NULL;
                pathlen = 0;
        }
-       err = ceph_pagelist_encode_string(pagelist, path, pathlen);
-       if (err)
-               goto out_free;
  
        spin_lock(&ci->i_ceph_lock);
        cap->seq = 0;        /* reset cap seq */
        cap->mseq = 0;       /* and migrate_seq */
        cap->cap_gen = cap->session->s_cap_gen;
  
-       if (recon_state->flock) {
+       if (recon_state->msg_version >= 2) {
                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
                rec.v2.issued = cpu_to_le32(cap->issued);
                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
                rec.v2.pathbase = cpu_to_le64(pathbase);
                rec.v2.flock_len = 0;
-               reclen = sizeof(rec.v2);
        } else {
                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
                ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
                rec.v1.pathbase = cpu_to_le64(pathbase);
-               reclen = sizeof(rec.v1);
+       }
+       if (list_empty(&ci->i_cap_snaps)) {
+               snap_follows = 0;
+       } else {
+               struct ceph_cap_snap *capsnap =
+                       list_first_entry(&ci->i_cap_snaps,
+                                        struct ceph_cap_snap, ci_item);
+               snap_follows = capsnap->follows;
        }
        spin_unlock(&ci->i_ceph_lock);
  
-       if (recon_state->flock) {
+       if (recon_state->msg_version >= 2) {
                int num_fcntl_locks, num_flock_locks;
                struct ceph_filelock *flocks;
+               size_t struct_len, total_len = 0;
+               u8 struct_v = 0;
  
  encode_again:
                ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
                                goto encode_again;
                        goto out_free;
                }
+               if (recon_state->msg_version >= 3) {
+                       /* version, compat_version and struct_len */
+                       total_len = 2 * sizeof(u8) + sizeof(u32);
+                       struct_v = 2;
+               }
                /*
                 * number of encoded locks is stable, so copy to pagelist
                 */
-               rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
-                                   (num_fcntl_locks+num_flock_locks) *
-                                   sizeof(struct ceph_filelock));
-               err = ceph_pagelist_append(pagelist, &rec, reclen);
-               if (!err)
-                       err = ceph_locks_to_pagelist(flocks, pagelist,
-                                                    num_fcntl_locks,
-                                                    num_flock_locks);
+               struct_len = 2 * sizeof(u32) +
+                           (num_fcntl_locks + num_flock_locks) *
+                           sizeof(struct ceph_filelock);
+               rec.v2.flock_len = cpu_to_le32(struct_len);
+               struct_len += sizeof(rec.v2);
+               struct_len += sizeof(u32) + pathlen;
+               if (struct_v >= 2)
+                       struct_len += sizeof(u64); /* snap_follows */
+               total_len += struct_len;
+               err = ceph_pagelist_reserve(pagelist, total_len);
+               if (!err) {
+                       if (recon_state->msg_version >= 3) {
+                               ceph_pagelist_encode_8(pagelist, struct_v);
+                               ceph_pagelist_encode_8(pagelist, 1);
+                               ceph_pagelist_encode_32(pagelist, struct_len);
+                       }
+                       ceph_pagelist_encode_string(pagelist, path, pathlen);
+                       ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
+                       ceph_locks_to_pagelist(flocks, pagelist,
+                                              num_fcntl_locks,
+                                              num_flock_locks);
+                       if (struct_v >= 2)
+                               ceph_pagelist_encode_64(pagelist, snap_follows);
+               }
                kfree(flocks);
        } else {
-               err = ceph_pagelist_append(pagelist, &rec, reclen);
+               size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
+               err = ceph_pagelist_reserve(pagelist, size);
+               if (!err) {
+                       ceph_pagelist_encode_string(pagelist, path, pathlen);
+                       ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+               }
        }
  
        recon_state->nr_caps++;
@@@ -2976,7 -2953,12 +2953,12 @@@ static void send_mds_reconnect(struct c
  
        recon_state.nr_caps = 0;
        recon_state.pagelist = pagelist;
-       recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+       if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+               recon_state.msg_version = 3;
+       else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
+               recon_state.msg_version = 2;
+       else
+               recon_state.msg_version = 1;
        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
        if (err < 0)
                goto fail;
                        goto fail;
        }
  
-       if (recon_state.flock)
-               reply->hdr.version = cpu_to_le16(2);
+       reply->hdr.version = cpu_to_le16(recon_state.msg_version);
  
        /* raced with cap release? */
        if (s_nr_caps != recon_state.nr_caps) {
@@@ -3204,7 -3185,7 +3185,7 @@@ static void handle_lease(struct ceph_md
                WARN_ON(1);
                goto release;  /* hrm... */
        }
 -      dname.hash = full_name_hash(dname.name, dname.len);
 +      dname.hash = full_name_hash(parent, dname.name, dname.len);
        dentry = d_lookup(parent, &dname);
        dput(parent);
        if (!dentry)
                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
  
                        di->lease_seq = seq;
-                       dentry->d_time = di->lease_renew_from + duration;
+                       di->time = di->lease_renew_from + duration;
                        di->lease_renew_after = di->lease_renew_from +
                                (duration >> 1);
                        di->lease_renew_from = 0;
@@@ -3296,47 -3277,6 +3277,6 @@@ void ceph_mdsc_lease_send_msg(struct ce
        ceph_con_send(&session->s_con, msg);
  }
  
- /*
-  * Preemptively release a lease we expect to invalidate anyway.
-  * Pass @inode always, @dentry is optional.
-  */
- void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
-                            struct dentry *dentry)
- {
-       struct ceph_dentry_info *di;
-       struct ceph_mds_session *session;
-       u32 seq;
-       BUG_ON(inode == NULL);
-       BUG_ON(dentry == NULL);
-       /* is dentry lease valid? */
-       spin_lock(&dentry->d_lock);
-       di = ceph_dentry(dentry);
-       if (!di || !di->lease_session ||
-           di->lease_session->s_mds < 0 ||
-           di->lease_gen != di->lease_session->s_cap_gen ||
-           !time_before(jiffies, dentry->d_time)) {
-               dout("lease_release inode %p dentry %p -- "
-                    "no lease\n",
-                    inode, dentry);
-               spin_unlock(&dentry->d_lock);
-               return;
-       }
-       /* we do have a lease on this dentry; note mds and seq */
-       session = ceph_get_mds_session(di->lease_session);
-       seq = di->lease_seq;
-       __ceph_mdsc_drop_dentry_lease(dentry);
-       spin_unlock(&dentry->d_lock);
-       dout("lease_release inode %p dentry %p to mds%d\n",
-            inode, dentry, session->s_mds);
-       ceph_mdsc_lease_send_msg(session, inode, dentry,
-                                CEPH_MDS_LEASE_RELEASE, seq);
-       ceph_put_mds_session(session);
- }
  /*
   * drop all leases (and dentry refs) in preparation for umount
   */
@@@ -3470,7 -3410,7 +3410,7 @@@ int ceph_mdsc_init(struct ceph_fs_clien
        INIT_LIST_HEAD(&mdsc->snap_flush_list);
        spin_lock_init(&mdsc->snap_flush_lock);
        mdsc->last_cap_flush_tid = 1;
-       mdsc->cap_flush_tree = RB_ROOT;
+       INIT_LIST_HEAD(&mdsc->cap_flush_list);
        INIT_LIST_HEAD(&mdsc->cap_dirty);
        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
        mdsc->num_cap_flushing = 0;
@@@ -3585,7 -3525,7 +3525,7 @@@ restart
  
  void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
  {
-       u64 want_tid, want_flush, want_snap;
+       u64 want_tid, want_flush;
  
        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
                return;
        ceph_flush_dirty_caps(mdsc);
        spin_lock(&mdsc->cap_dirty_lock);
        want_flush = mdsc->last_cap_flush_tid;
+       if (!list_empty(&mdsc->cap_flush_list)) {
+               struct ceph_cap_flush *cf =
+                       list_last_entry(&mdsc->cap_flush_list,
+                                       struct ceph_cap_flush, g_list);
+               cf->wake = true;
+       }
        spin_unlock(&mdsc->cap_dirty_lock);
  
-       down_read(&mdsc->snap_rwsem);
-       want_snap = mdsc->last_snap_seq;
-       up_read(&mdsc->snap_rwsem);
-       dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
-            want_tid, want_flush, want_snap);
+       dout("sync want tid %lld flush_seq %lld\n",
+            want_tid, want_flush);
  
        wait_unsafe_requests(mdsc, want_tid);
-       wait_caps_flush(mdsc, want_flush, want_snap);
+       wait_caps_flush(mdsc, want_flush);
  }
  
  /*
@@@ -3729,11 -3671,86 +3671,86 @@@ void ceph_mdsc_destroy(struct ceph_fs_c
        dout("mdsc_destroy %p done\n", mdsc);
  }
  
+ void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+ {
+       struct ceph_fs_client *fsc = mdsc->fsc;
+       const char *mds_namespace = fsc->mount_options->mds_namespace;
+       void *p = msg->front.iov_base;
+       void *end = p + msg->front.iov_len;
+       u32 epoch;
+       u32 map_len;
+       u32 num_fs;
+       u32 mount_fscid = (u32)-1;
+       u8 struct_v, struct_cv;
+       int err = -EINVAL;
+       ceph_decode_need(&p, end, sizeof(u32), bad);
+       epoch = ceph_decode_32(&p);
+       dout("handle_fsmap epoch %u\n", epoch);
+       ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+       struct_v = ceph_decode_8(&p);
+       struct_cv = ceph_decode_8(&p);
+       map_len = ceph_decode_32(&p);
+       ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
+       p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
+       num_fs = ceph_decode_32(&p);
+       while (num_fs-- > 0) {
+               void *info_p, *info_end;
+               u32 info_len;
+               u8 info_v, info_cv;
+               u32 fscid, namelen;
+               ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+               info_v = ceph_decode_8(&p);
+               info_cv = ceph_decode_8(&p);
+               info_len = ceph_decode_32(&p);
+               ceph_decode_need(&p, end, info_len, bad);
+               info_p = p;
+               info_end = p + info_len;
+               p = info_end;
+               ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
+               fscid = ceph_decode_32(&info_p);
+               namelen = ceph_decode_32(&info_p);
+               ceph_decode_need(&info_p, info_end, namelen, bad);
+               if (mds_namespace &&
+                   strlen(mds_namespace) == namelen &&
+                   !strncmp(mds_namespace, (char *)info_p, namelen)) {
+                       mount_fscid = fscid;
+                       break;
+               }
+       }
+       ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
+       if (mount_fscid != (u32)-1) {
+               fsc->client->monc.fs_cluster_id = mount_fscid;
+               ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+                                  0, true);
+               ceph_monc_renew_subs(&fsc->client->monc);
+       } else {
+               err = -ENOENT;
+               goto err_out;
+       }
+       return;
+ bad:
+       pr_err("error decoding fsmap\n");
+ err_out:
+       mutex_lock(&mdsc->mutex);
+       mdsc->mdsmap_err = -ENOENT;
+       __wake_requests(mdsc, &mdsc->waiting_for_map);
+       mutex_unlock(&mdsc->mutex);
+       return;
+ }
  
  /*
   * handle mds map update.
   */
- void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
  {
        u32 epoch;
        u32 maplen;
@@@ -3840,7 -3857,10 +3857,10 @@@ static void dispatch(struct ceph_connec
  
        switch (type) {
        case CEPH_MSG_MDS_MAP:
-               ceph_mdsc_handle_map(mdsc, msg);
+               ceph_mdsc_handle_mdsmap(mdsc, msg);
+               break;
+       case CEPH_MSG_FS_MAP_USER:
+               ceph_mdsc_handle_fsmap(mdsc, msg);
                break;
        case CEPH_MSG_CLIENT_SESSION:
                handle_session(s, msg);
This page took 0.05655 seconds and 5 git commands to generate.