source: src/linux/universal/linux-4.4/fs/ceph/mds_client.c @ 31662

Last change on this file since 31662 was 31662, checked in by brainslayer, 2 months ago

use new squashfs in all kernels

File size: 100.9 KB
Line 
1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/fs.h>
4#include <linux/wait.h>
5#include <linux/slab.h>
6#include <linux/gfp.h>
7#include <linux/sched.h>
8#include <linux/debugfs.h>
9#include <linux/seq_file.h>
10#include <linux/utsname.h>
11#include <linux/ratelimit.h>
12
13#include "super.h"
14#include "mds_client.h"
15
16#include <linux/ceph/ceph_features.h>
17#include <linux/ceph/messenger.h>
18#include <linux/ceph/decode.h>
19#include <linux/ceph/pagelist.h>
20#include <linux/ceph/auth.h>
21#include <linux/ceph/debugfs.h>
22
23/*
24 * A cluster of MDS (metadata server) daemons is responsible for
25 * managing the file system namespace (the directory hierarchy and
26 * inodes) and for coordinating shared access to storage.  Metadata is
27 * partitioning hierarchically across a number of servers, and that
28 * partition varies over time as the cluster adjusts the distribution
29 * in order to balance load.
30 *
31 * The MDS client is primarily responsible to managing synchronous
32 * metadata requests for operations like open, unlink, and so forth.
33 * If there is a MDS failure, we find out about it when we (possibly
34 * request and) receive a new MDS map, and can resubmit affected
35 * requests.
36 *
37 * For the most part, though, we take advantage of a lossless
38 * communications channel to the MDS, and do not need to worry about
39 * timing out or resubmitting requests.
40 *
41 * We maintain a stateful "session" with each MDS we interact with.
42 * Within each session, we sent periodic heartbeat messages to ensure
43 * any capabilities or leases we have been issues remain valid.  If
44 * the session times out and goes stale, our leases and capabilities
45 * are no longer valid.
46 */
47
48struct ceph_reconnect_state {
49        int nr_caps;
50        struct ceph_pagelist *pagelist;
51        bool flock;
52};
53
54static void __wake_requests(struct ceph_mds_client *mdsc,
55                            struct list_head *head);
56
57static const struct ceph_connection_operations mds_con_ops;
58
59
60/*
61 * mds reply parsing
62 */
63
64/*
65 * parse individual inode info
66 */
67static int parse_reply_info_in(void **p, void *end,
68                               struct ceph_mds_reply_info_in *info,
69                               u64 features)
70{
71        int err = -EIO;
72
73        info->in = *p;
74        *p += sizeof(struct ceph_mds_reply_inode) +
75                sizeof(*info->in->fragtree.splits) *
76                le32_to_cpu(info->in->fragtree.nsplits);
77
78        ceph_decode_32_safe(p, end, info->symlink_len, bad);
79        ceph_decode_need(p, end, info->symlink_len, bad);
80        info->symlink = *p;
81        *p += info->symlink_len;
82
83        if (features & CEPH_FEATURE_DIRLAYOUTHASH)
84                ceph_decode_copy_safe(p, end, &info->dir_layout,
85                                      sizeof(info->dir_layout), bad);
86        else
87                memset(&info->dir_layout, 0, sizeof(info->dir_layout));
88
89        ceph_decode_32_safe(p, end, info->xattr_len, bad);
90        ceph_decode_need(p, end, info->xattr_len, bad);
91        info->xattr_data = *p;
92        *p += info->xattr_len;
93
94        if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
95                ceph_decode_64_safe(p, end, info->inline_version, bad);
96                ceph_decode_32_safe(p, end, info->inline_len, bad);
97                ceph_decode_need(p, end, info->inline_len, bad);
98                info->inline_data = *p;
99                *p += info->inline_len;
100        } else
101                info->inline_version = CEPH_INLINE_NONE;
102
103        return 0;
104bad:
105        return err;
106}
107
108/*
109 * parse a normal reply, which may contain a (dir+)dentry and/or a
110 * target inode.
111 */
112static int parse_reply_info_trace(void **p, void *end,
113                                  struct ceph_mds_reply_info_parsed *info,
114                                  u64 features)
115{
116        int err;
117
118        if (info->head->is_dentry) {
119                err = parse_reply_info_in(p, end, &info->diri, features);
120                if (err < 0)
121                        goto out_bad;
122
123                if (unlikely(*p + sizeof(*info->dirfrag) > end))
124                        goto bad;
125                info->dirfrag = *p;
126                *p += sizeof(*info->dirfrag) +
127                        sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
128                if (unlikely(*p > end))
129                        goto bad;
130
131                ceph_decode_32_safe(p, end, info->dname_len, bad);
132                ceph_decode_need(p, end, info->dname_len, bad);
133                info->dname = *p;
134                *p += info->dname_len;
135                info->dlease = *p;
136                *p += sizeof(*info->dlease);
137        }
138
139        if (info->head->is_target) {
140                err = parse_reply_info_in(p, end, &info->targeti, features);
141                if (err < 0)
142                        goto out_bad;
143        }
144
145        if (unlikely(*p != end))
146                goto bad;
147        return 0;
148
149bad:
150        err = -EIO;
151out_bad:
152        pr_err("problem parsing mds trace %d\n", err);
153        return err;
154}
155
156/*
157 * parse readdir results
158 */
159static int parse_reply_info_dir(void **p, void *end,
160                                struct ceph_mds_reply_info_parsed *info,
161                                u64 features)
162{
163        u32 num, i = 0;
164        int err;
165
166        info->dir_dir = *p;
167        if (*p + sizeof(*info->dir_dir) > end)
168                goto bad;
169        *p += sizeof(*info->dir_dir) +
170                sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
171        if (*p > end)
172                goto bad;
173
174        ceph_decode_need(p, end, sizeof(num) + 2, bad);
175        num = ceph_decode_32(p);
176        info->dir_end = ceph_decode_8(p);
177        info->dir_complete = ceph_decode_8(p);
178        if (num == 0)
179                goto done;
180
181        BUG_ON(!info->dir_in);
182        info->dir_dname = (void *)(info->dir_in + num);
183        info->dir_dname_len = (void *)(info->dir_dname + num);
184        info->dir_dlease = (void *)(info->dir_dname_len + num);
185        if ((unsigned long)(info->dir_dlease + num) >
186            (unsigned long)info->dir_in + info->dir_buf_size) {
187                pr_err("dir contents are larger than expected\n");
188                WARN_ON(1);
189                goto bad;
190        }
191
192        info->dir_nr = num;
193        while (num) {
194                /* dentry */
195                ceph_decode_need(p, end, sizeof(u32)*2, bad);
196                info->dir_dname_len[i] = ceph_decode_32(p);
197                ceph_decode_need(p, end, info->dir_dname_len[i], bad);
198                info->dir_dname[i] = *p;
199                *p += info->dir_dname_len[i];
200                dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
201                     info->dir_dname[i]);
202                info->dir_dlease[i] = *p;
203                *p += sizeof(struct ceph_mds_reply_lease);
204
205                /* inode */
206                err = parse_reply_info_in(p, end, &info->dir_in[i], features);
207                if (err < 0)
208                        goto out_bad;
209                i++;
210                num--;
211        }
212
213done:
214        if (*p != end)
215                goto bad;
216        return 0;
217
218bad:
219        err = -EIO;
220out_bad:
221        pr_err("problem parsing dir contents %d\n", err);
222        return err;
223}
224
225/*
226 * parse fcntl F_GETLK results
227 */
228static int parse_reply_info_filelock(void **p, void *end,
229                                     struct ceph_mds_reply_info_parsed *info,
230                                     u64 features)
231{
232        if (*p + sizeof(*info->filelock_reply) > end)
233                goto bad;
234
235        info->filelock_reply = *p;
236        *p += sizeof(*info->filelock_reply);
237
238        if (unlikely(*p != end))
239                goto bad;
240        return 0;
241
242bad:
243        return -EIO;
244}
245
246/*
247 * parse create results
248 */
249static int parse_reply_info_create(void **p, void *end,
250                                  struct ceph_mds_reply_info_parsed *info,
251                                  u64 features)
252{
253        if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
254                if (*p == end) {
255                        info->has_create_ino = false;
256                } else {
257                        info->has_create_ino = true;
258                        info->ino = ceph_decode_64(p);
259                }
260        }
261
262        if (unlikely(*p != end))
263                goto bad;
264        return 0;
265
266bad:
267        return -EIO;
268}
269
270/*
271 * parse extra results
272 */
273static int parse_reply_info_extra(void **p, void *end,
274                                  struct ceph_mds_reply_info_parsed *info,
275                                  u64 features)
276{
277        u32 op = le32_to_cpu(info->head->op);
278
279        if (op == CEPH_MDS_OP_GETFILELOCK)
280                return parse_reply_info_filelock(p, end, info, features);
281        else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
282                return parse_reply_info_dir(p, end, info, features);
283        else if (op == CEPH_MDS_OP_CREATE)
284                return parse_reply_info_create(p, end, info, features);
285        else
286                return -EIO;
287}
288
289/*
290 * parse entire mds reply
291 */
292static int parse_reply_info(struct ceph_msg *msg,
293                            struct ceph_mds_reply_info_parsed *info,
294                            u64 features)
295{
296        void *p, *end;
297        u32 len;
298        int err;
299
300        info->head = msg->front.iov_base;
301        p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
302        end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
303
304        /* trace */
305        ceph_decode_32_safe(&p, end, len, bad);
306        if (len > 0) {
307                ceph_decode_need(&p, end, len, bad);
308                err = parse_reply_info_trace(&p, p+len, info, features);
309                if (err < 0)
310                        goto out_bad;
311        }
312
313        /* extra */
314        ceph_decode_32_safe(&p, end, len, bad);
315        if (len > 0) {
316                ceph_decode_need(&p, end, len, bad);
317                err = parse_reply_info_extra(&p, p+len, info, features);
318                if (err < 0)
319                        goto out_bad;
320        }
321
322        /* snap blob */
323        ceph_decode_32_safe(&p, end, len, bad);
324        info->snapblob_len = len;
325        info->snapblob = p;
326        p += len;
327
328        if (p != end)
329                goto bad;
330        return 0;
331
332bad:
333        err = -EIO;
334out_bad:
335        pr_err("mds parse_reply err %d\n", err);
336        return err;
337}
338
339static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
340{
341        if (!info->dir_in)
342                return;
343        free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
344}
345
346
347/*
348 * sessions
349 */
350const char *ceph_session_state_name(int s)
351{
352        switch (s) {
353        case CEPH_MDS_SESSION_NEW: return "new";
354        case CEPH_MDS_SESSION_OPENING: return "opening";
355        case CEPH_MDS_SESSION_OPEN: return "open";
356        case CEPH_MDS_SESSION_HUNG: return "hung";
357        case CEPH_MDS_SESSION_CLOSING: return "closing";
358        case CEPH_MDS_SESSION_RESTARTING: return "restarting";
359        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
360        default: return "???";
361        }
362}
363
364static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
365{
366        if (atomic_inc_not_zero(&s->s_ref)) {
367                dout("mdsc get_session %p %d -> %d\n", s,
368                     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
369                return s;
370        } else {
371                dout("mdsc get_session %p 0 -- FAIL", s);
372                return NULL;
373        }
374}
375
376void ceph_put_mds_session(struct ceph_mds_session *s)
377{
378        dout("mdsc put_session %p %d -> %d\n", s,
379             atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
380        if (atomic_dec_and_test(&s->s_ref)) {
381                if (s->s_auth.authorizer)
382                        ceph_auth_destroy_authorizer(
383                                s->s_mdsc->fsc->client->monc.auth,
384                                s->s_auth.authorizer);
385                kfree(s);
386        }
387}
388
389/*
390 * called under mdsc->mutex
391 */
392struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
393                                                   int mds)
394{
395        struct ceph_mds_session *session;
396
397        if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
398                return NULL;
399        session = mdsc->sessions[mds];
400        dout("lookup_mds_session %p %d\n", session,
401             atomic_read(&session->s_ref));
402        get_session(session);
403        return session;
404}
405
406static bool __have_session(struct ceph_mds_client *mdsc, int mds)
407{
408        if (mds >= mdsc->max_sessions)
409                return false;
410        return mdsc->sessions[mds];
411}
412
413static int __verify_registered_session(struct ceph_mds_client *mdsc,
414                                       struct ceph_mds_session *s)
415{
416        if (s->s_mds >= mdsc->max_sessions ||
417            mdsc->sessions[s->s_mds] != s)
418                return -ENOENT;
419        return 0;
420}
421
422/*
423 * create+register a new session for given mds.
424 * called under mdsc->mutex.
425 */
426static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
427                                                 int mds)
428{
429        struct ceph_mds_session *s;
430
431        if (mds >= mdsc->mdsmap->m_max_mds)
432                return ERR_PTR(-EINVAL);
433
434        s = kzalloc(sizeof(*s), GFP_NOFS);
435        if (!s)
436                return ERR_PTR(-ENOMEM);
437        s->s_mdsc = mdsc;
438        s->s_mds = mds;
439        s->s_state = CEPH_MDS_SESSION_NEW;
440        s->s_ttl = 0;
441        s->s_seq = 0;
442        mutex_init(&s->s_mutex);
443
444        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
445
446        spin_lock_init(&s->s_gen_ttl_lock);
447        s->s_cap_gen = 0;
448        s->s_cap_ttl = jiffies - 1;
449
450        spin_lock_init(&s->s_cap_lock);
451        s->s_renew_requested = 0;
452        s->s_renew_seq = 0;
453        INIT_LIST_HEAD(&s->s_caps);
454        s->s_nr_caps = 0;
455        s->s_trim_caps = 0;
456        atomic_set(&s->s_ref, 1);
457        INIT_LIST_HEAD(&s->s_waiting);
458        INIT_LIST_HEAD(&s->s_unsafe);
459        s->s_num_cap_releases = 0;
460        s->s_cap_reconnect = 0;
461        s->s_cap_iterator = NULL;
462        INIT_LIST_HEAD(&s->s_cap_releases);
463        INIT_LIST_HEAD(&s->s_cap_flushing);
464        INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
465
466        dout("register_session mds%d\n", mds);
467        if (mds >= mdsc->max_sessions) {
468                int newmax = 1 << get_count_order(mds+1);
469                struct ceph_mds_session **sa;
470
471                dout("register_session realloc to %d\n", newmax);
472                sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
473                if (sa == NULL)
474                        goto fail_realloc;
475                if (mdsc->sessions) {
476                        memcpy(sa, mdsc->sessions,
477                               mdsc->max_sessions * sizeof(void *));
478                        kfree(mdsc->sessions);
479                }
480                mdsc->sessions = sa;
481                mdsc->max_sessions = newmax;
482        }
483        mdsc->sessions[mds] = s;
484        atomic_inc(&mdsc->num_sessions);
485        atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
486
487        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
488                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
489
490        return s;
491
492fail_realloc:
493        kfree(s);
494        return ERR_PTR(-ENOMEM);
495}
496
497/*
498 * called under mdsc->mutex
499 */
500static void __unregister_session(struct ceph_mds_client *mdsc,
501                               struct ceph_mds_session *s)
502{
503        dout("__unregister_session mds%d %p\n", s->s_mds, s);
504        BUG_ON(mdsc->sessions[s->s_mds] != s);
505        mdsc->sessions[s->s_mds] = NULL;
506        ceph_con_close(&s->s_con);
507        ceph_put_mds_session(s);
508        atomic_dec(&mdsc->num_sessions);
509}
510
511/*
512 * drop session refs in request.
513 *
514 * should be last request ref, or hold mdsc->mutex
515 */
516static void put_request_session(struct ceph_mds_request *req)
517{
518        if (req->r_session) {
519                ceph_put_mds_session(req->r_session);
520                req->r_session = NULL;
521        }
522}
523
524void ceph_mdsc_release_request(struct kref *kref)
525{
526        struct ceph_mds_request *req = container_of(kref,
527                                                    struct ceph_mds_request,
528                                                    r_kref);
529        destroy_reply_info(&req->r_reply_info);
530        if (req->r_request)
531                ceph_msg_put(req->r_request);
532        if (req->r_reply)
533                ceph_msg_put(req->r_reply);
534        if (req->r_inode) {
535                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
536                iput(req->r_inode);
537        }
538        if (req->r_locked_dir)
539                ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
540        iput(req->r_target_inode);
541        if (req->r_dentry)
542                dput(req->r_dentry);
543        if (req->r_old_dentry)
544                dput(req->r_old_dentry);
545        if (req->r_old_dentry_dir) {
546                /*
547                 * track (and drop pins for) r_old_dentry_dir
548                 * separately, since r_old_dentry's d_parent may have
549                 * changed between the dir mutex being dropped and
550                 * this request being freed.
551                 */
552                ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
553                                  CEPH_CAP_PIN);
554                iput(req->r_old_dentry_dir);
555        }
556        kfree(req->r_path1);
557        kfree(req->r_path2);
558        if (req->r_pagelist)
559                ceph_pagelist_release(req->r_pagelist);
560        put_request_session(req);
561        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
562        kfree(req);
563}
564
565/*
566 * lookup session, bump ref if found.
567 *
568 * called under mdsc->mutex.
569 */
570static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
571                                             u64 tid)
572{
573        struct ceph_mds_request *req;
574        struct rb_node *n = mdsc->request_tree.rb_node;
575
576        while (n) {
577                req = rb_entry(n, struct ceph_mds_request, r_node);
578                if (tid < req->r_tid)
579                        n = n->rb_left;
580                else if (tid > req->r_tid)
581                        n = n->rb_right;
582                else {
583                        ceph_mdsc_get_request(req);
584                        return req;
585                }
586        }
587        return NULL;
588}
589
590static void __insert_request(struct ceph_mds_client *mdsc,
591                             struct ceph_mds_request *new)
592{
593        struct rb_node **p = &mdsc->request_tree.rb_node;
594        struct rb_node *parent = NULL;
595        struct ceph_mds_request *req = NULL;
596
597        while (*p) {
598                parent = *p;
599                req = rb_entry(parent, struct ceph_mds_request, r_node);
600                if (new->r_tid < req->r_tid)
601                        p = &(*p)->rb_left;
602                else if (new->r_tid > req->r_tid)
603                        p = &(*p)->rb_right;
604                else
605                        BUG();
606        }
607
608        rb_link_node(&new->r_node, parent, p);
609        rb_insert_color(&new->r_node, &mdsc->request_tree);
610}
611
612/*
613 * Register an in-flight request, and assign a tid.  Link to directory
614 * are modifying (if any).
615 *
616 * Called under mdsc->mutex.
617 */
618static void __register_request(struct ceph_mds_client *mdsc,
619                               struct ceph_mds_request *req,
620                               struct inode *dir)
621{
622        req->r_tid = ++mdsc->last_tid;
623        if (req->r_num_caps)
624                ceph_reserve_caps(mdsc, &req->r_caps_reservation,
625                                  req->r_num_caps);
626        dout("__register_request %p tid %lld\n", req, req->r_tid);
627        ceph_mdsc_get_request(req);
628        __insert_request(mdsc, req);
629
630        req->r_uid = current_fsuid();
631        req->r_gid = current_fsgid();
632
633        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
634                mdsc->oldest_tid = req->r_tid;
635
636        if (dir) {
637                ihold(dir);
638                req->r_unsafe_dir = dir;
639        }
640}
641
642static void __unregister_request(struct ceph_mds_client *mdsc,
643                                 struct ceph_mds_request *req)
644{
645        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
646
647        /* Never leave an unregistered request on an unsafe list! */
648        list_del_init(&req->r_unsafe_item);
649
650        if (req->r_tid == mdsc->oldest_tid) {
651                struct rb_node *p = rb_next(&req->r_node);
652                mdsc->oldest_tid = 0;
653                while (p) {
654                        struct ceph_mds_request *next_req =
655                                rb_entry(p, struct ceph_mds_request, r_node);
656                        if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
657                                mdsc->oldest_tid = next_req->r_tid;
658                                break;
659                        }
660                        p = rb_next(p);
661                }
662        }
663
664        rb_erase(&req->r_node, &mdsc->request_tree);
665        RB_CLEAR_NODE(&req->r_node);
666
667        if (req->r_unsafe_dir && req->r_got_unsafe) {
668                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
669                spin_lock(&ci->i_unsafe_lock);
670                list_del_init(&req->r_unsafe_dir_item);
671                spin_unlock(&ci->i_unsafe_lock);
672        }
673        if (req->r_target_inode && req->r_got_unsafe) {
674                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
675                spin_lock(&ci->i_unsafe_lock);
676                list_del_init(&req->r_unsafe_target_item);
677                spin_unlock(&ci->i_unsafe_lock);
678        }
679
680        if (req->r_unsafe_dir) {
681                iput(req->r_unsafe_dir);
682                req->r_unsafe_dir = NULL;
683        }
684
685        complete_all(&req->r_safe_completion);
686
687        ceph_mdsc_put_request(req);
688}
689
690/*
691 * Choose mds to send request to next.  If there is a hint set in the
692 * request (e.g., due to a prior forward hint from the mds), use that.
693 * Otherwise, consult frag tree and/or caps to identify the
694 * appropriate mds.  If all else fails, choose randomly.
695 *
696 * Called under mdsc->mutex.
697 */
698static struct dentry *get_nonsnap_parent(struct dentry *dentry)
699{
700        /*
701         * we don't need to worry about protecting the d_parent access
702         * here because we never renaming inside the snapped namespace
703         * except to resplice to another snapdir, and either the old or new
704         * result is a valid result.
705         */
706        while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
707                dentry = dentry->d_parent;
708        return dentry;
709}
710
711static int __choose_mds(struct ceph_mds_client *mdsc,
712                        struct ceph_mds_request *req)
713{
714        struct inode *inode;
715        struct ceph_inode_info *ci;
716        struct ceph_cap *cap;
717        int mode = req->r_direct_mode;
718        int mds = -1;
719        u32 hash = req->r_direct_hash;
720        bool is_hash = req->r_direct_is_hash;
721
722        /*
723         * is there a specific mds we should try?  ignore hint if we have
724         * no session and the mds is not up (active or recovering).
725         */
726        if (req->r_resend_mds >= 0 &&
727            (__have_session(mdsc, req->r_resend_mds) ||
728             ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
729                dout("choose_mds using resend_mds mds%d\n",
730                     req->r_resend_mds);
731                return req->r_resend_mds;
732        }
733
734        if (mode == USE_RANDOM_MDS)
735                goto random;
736
737        inode = NULL;
738        if (req->r_inode) {
739                inode = req->r_inode;
740        } else if (req->r_dentry) {
741                /* ignore race with rename; old or new d_parent is okay */
742                struct dentry *parent = req->r_dentry->d_parent;
743                struct inode *dir = d_inode(parent);
744
745                if (dir->i_sb != mdsc->fsc->sb) {
746                        /* not this fs! */
747                        inode = d_inode(req->r_dentry);
748                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
749                        /* direct snapped/virtual snapdir requests
750                         * based on parent dir inode */
751                        struct dentry *dn = get_nonsnap_parent(parent);
752                        inode = d_inode(dn);
753                        dout("__choose_mds using nonsnap parent %p\n", inode);
754                } else {
755                        /* dentry target */
756                        inode = d_inode(req->r_dentry);
757                        if (!inode || mode == USE_AUTH_MDS) {
758                                /* dir + name */
759                                inode = dir;
760                                hash = ceph_dentry_hash(dir, req->r_dentry);
761                                is_hash = true;
762                        }
763                }
764        }
765
766        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
767             (int)hash, mode);
768        if (!inode)
769                goto random;
770        ci = ceph_inode(inode);
771
772        if (is_hash && S_ISDIR(inode->i_mode)) {
773                struct ceph_inode_frag frag;
774                int found;
775
776                ceph_choose_frag(ci, hash, &frag, &found);
777                if (found) {
778                        if (mode == USE_ANY_MDS && frag.ndist > 0) {
779                                u8 r;
780
781                                /* choose a random replica */
782                                get_random_bytes(&r, 1);
783                                r %= frag.ndist;
784                                mds = frag.dist[r];
785                                dout("choose_mds %p %llx.%llx "
786                                     "frag %u mds%d (%d/%d)\n",
787                                     inode, ceph_vinop(inode),
788                                     frag.frag, mds,
789                                     (int)r, frag.ndist);
790                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
791                                    CEPH_MDS_STATE_ACTIVE)
792                                        return mds;
793                        }
794
795                        /* since this file/dir wasn't known to be
796                         * replicated, then we want to look for the
797                         * authoritative mds. */
798                        mode = USE_AUTH_MDS;
799                        if (frag.mds >= 0) {
800                                /* choose auth mds */
801                                mds = frag.mds;
802                                dout("choose_mds %p %llx.%llx "
803                                     "frag %u mds%d (auth)\n",
804                                     inode, ceph_vinop(inode), frag.frag, mds);
805                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
806                                    CEPH_MDS_STATE_ACTIVE)
807                                        return mds;
808                        }
809                }
810        }
811
812        spin_lock(&ci->i_ceph_lock);
813        cap = NULL;
814        if (mode == USE_AUTH_MDS)
815                cap = ci->i_auth_cap;
816        if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
817                cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
818        if (!cap) {
819                spin_unlock(&ci->i_ceph_lock);
820                goto random;
821        }
822        mds = cap->session->s_mds;
823        dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
824             inode, ceph_vinop(inode), mds,
825             cap == ci->i_auth_cap ? "auth " : "", cap);
826        spin_unlock(&ci->i_ceph_lock);
827        return mds;
828
829random:
830        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
831        dout("choose_mds chose random mds%d\n", mds);
832        return mds;
833}
834
835
836/*
837 * session messages
838 */
839static struct ceph_msg *create_session_msg(u32 op, u64 seq)
840{
841        struct ceph_msg *msg;
842        struct ceph_mds_session_head *h;
843
844        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
845                           false);
846        if (!msg) {
847                pr_err("create_session_msg ENOMEM creating msg\n");
848                return NULL;
849        }
850        h = msg->front.iov_base;
851        h->op = cpu_to_le32(op);
852        h->seq = cpu_to_le64(seq);
853
854        return msg;
855}
856
857/*
858 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
859 * to include additional client metadata fields.
860 */
861static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
862{
863        struct ceph_msg *msg;
864        struct ceph_mds_session_head *h;
865        int i = -1;
866        int metadata_bytes = 0;
867        int metadata_key_count = 0;
868        struct ceph_options *opt = mdsc->fsc->client->options;
869        void *p;
870
871        const char* metadata[][2] = {
872                {"hostname", utsname()->nodename},
873                {"kernel_version", utsname()->release},
874                {"entity_id", opt->name ? opt->name : ""},
875                {NULL, NULL}
876        };
877
878        /* Calculate serialized length of metadata */
879        metadata_bytes = 4;  /* map length */
880        for (i = 0; metadata[i][0] != NULL; ++i) {
881                metadata_bytes += 8 + strlen(metadata[i][0]) +
882                        strlen(metadata[i][1]);
883                metadata_key_count++;
884        }
885
886        /* Allocate the message */
887        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
888                           GFP_NOFS, false);
889        if (!msg) {
890                pr_err("create_session_msg ENOMEM creating msg\n");
891                return NULL;
892        }
893        h = msg->front.iov_base;
894        h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
895        h->seq = cpu_to_le64(seq);
896
897        /*
898         * Serialize client metadata into waiting buffer space, using
899         * the format that userspace expects for map<string, string>
900         *
901         * ClientSession messages with metadata are v2
902         */
903        msg->hdr.version = cpu_to_le16(2);
904        msg->hdr.compat_version = cpu_to_le16(1);
905
906        /* The write pointer, following the session_head structure */
907        p = msg->front.iov_base + sizeof(*h);
908
909        /* Number of entries in the map */
910        ceph_encode_32(&p, metadata_key_count);
911
912        /* Two length-prefixed strings for each entry in the map */
913        for (i = 0; metadata[i][0] != NULL; ++i) {
914                size_t const key_len = strlen(metadata[i][0]);
915                size_t const val_len = strlen(metadata[i][1]);
916
917                ceph_encode_32(&p, key_len);
918                memcpy(p, metadata[i][0], key_len);
919                p += key_len;
920                ceph_encode_32(&p, val_len);
921                memcpy(p, metadata[i][1], val_len);
922                p += val_len;
923        }
924
925        return msg;
926}
927
928/*
929 * send session open request.
930 *
931 * called under mdsc->mutex
932 */
933static int __open_session(struct ceph_mds_client *mdsc,
934                          struct ceph_mds_session *session)
935{
936        struct ceph_msg *msg;
937        int mstate;
938        int mds = session->s_mds;
939
940        /* wait for mds to go active? */
941        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
942        dout("open_session to mds%d (%s)\n", mds,
943             ceph_mds_state_name(mstate));
944        session->s_state = CEPH_MDS_SESSION_OPENING;
945        session->s_renew_requested = jiffies;
946
947        /* send connect message */
948        msg = create_session_open_msg(mdsc, session->s_seq);
949        if (!msg)
950                return -ENOMEM;
951        ceph_con_send(&session->s_con, msg);
952        return 0;
953}
954
955/*
956 * open sessions for any export targets for the given mds
957 *
958 * called under mdsc->mutex
959 */
960static struct ceph_mds_session *
961__open_export_target_session(struct ceph_mds_client *mdsc, int target)
962{
963        struct ceph_mds_session *session;
964
965        session = __ceph_lookup_mds_session(mdsc, target);
966        if (!session) {
967                session = register_session(mdsc, target);
968                if (IS_ERR(session))
969                        return session;
970        }
971        if (session->s_state == CEPH_MDS_SESSION_NEW ||
972            session->s_state == CEPH_MDS_SESSION_CLOSING)
973                __open_session(mdsc, session);
974
975        return session;
976}
977
978struct ceph_mds_session *
979ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
980{
981        struct ceph_mds_session *session;
982
983        dout("open_export_target_session to mds%d\n", target);
984
985        mutex_lock(&mdsc->mutex);
986        session = __open_export_target_session(mdsc, target);
987        mutex_unlock(&mdsc->mutex);
988
989        return session;
990}
991
992static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
993                                          struct ceph_mds_session *session)
994{
995        struct ceph_mds_info *mi;
996        struct ceph_mds_session *ts;
997        int i, mds = session->s_mds;
998
999        if (mds >= mdsc->mdsmap->m_max_mds)
1000                return;
1001
1002        mi = &mdsc->mdsmap->m_info[mds];
1003        dout("open_export_target_sessions for mds%d (%d targets)\n",
1004             session->s_mds, mi->num_export_targets);
1005
1006        for (i = 0; i < mi->num_export_targets; i++) {
1007                ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1008                if (!IS_ERR(ts))
1009                        ceph_put_mds_session(ts);
1010        }
1011}
1012
1013void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1014                                           struct ceph_mds_session *session)
1015{
1016        mutex_lock(&mdsc->mutex);
1017        __open_export_target_sessions(mdsc, session);
1018        mutex_unlock(&mdsc->mutex);
1019}
1020
1021/*
1022 * session caps
1023 */
1024
1025/* caller holds s_cap_lock, we drop it */
1026static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
1027                                 struct ceph_mds_session *session)
1028        __releases(session->s_cap_lock)
1029{
1030        LIST_HEAD(tmp_list);
1031        list_splice_init(&session->s_cap_releases, &tmp_list);
1032        session->s_num_cap_releases = 0;
1033        spin_unlock(&session->s_cap_lock);
1034
1035        dout("cleanup_cap_releases mds%d\n", session->s_mds);
1036        while (!list_empty(&tmp_list)) {
1037                struct ceph_cap *cap;
1038                /* zero out the in-progress message */
1039                cap = list_first_entry(&tmp_list,
1040                                        struct ceph_cap, session_caps);
1041                list_del(&cap->session_caps);
1042                ceph_put_cap(mdsc, cap);
1043        }
1044}
1045
1046static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1047                                     struct ceph_mds_session *session)
1048{
1049        struct ceph_mds_request *req;
1050        struct rb_node *p;
1051
1052        dout("cleanup_session_requests mds%d\n", session->s_mds);
1053        mutex_lock(&mdsc->mutex);
1054        while (!list_empty(&session->s_unsafe)) {
1055                req = list_first_entry(&session->s_unsafe,
1056                                       struct ceph_mds_request, r_unsafe_item);
1057                pr_warn_ratelimited(" dropping unsafe request %llu\n",
1058                                    req->r_tid);
1059                __unregister_request(mdsc, req);
1060        }
1061        /* zero r_attempts, so kick_requests() will re-send requests */
1062        p = rb_first(&mdsc->request_tree);
1063        while (p) {
1064                req = rb_entry(p, struct ceph_mds_request, r_node);
1065                p = rb_next(p);
1066                if (req->r_session &&
1067                    req->r_session->s_mds == session->s_mds)
1068                        req->r_attempts = 0;
1069        }
1070        mutex_unlock(&mdsc->mutex);
1071}
1072
1073/*
1074 * Helper to safely iterate over all caps associated with a session, with
1075 * special care taken to handle a racing __ceph_remove_cap().
1076 *
1077 * Caller must hold session s_mutex.
1078 */
1079static int iterate_session_caps(struct ceph_mds_session *session,
1080                                 int (*cb)(struct inode *, struct ceph_cap *,
1081                                            void *), void *arg)
1082{
1083        struct list_head *p;
1084        struct ceph_cap *cap;
1085        struct inode *inode, *last_inode = NULL;
1086        struct ceph_cap *old_cap = NULL;
1087        int ret;
1088
1089        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1090        spin_lock(&session->s_cap_lock);
1091        p = session->s_caps.next;
1092        while (p != &session->s_caps) {
1093                cap = list_entry(p, struct ceph_cap, session_caps);
1094                inode = igrab(&cap->ci->vfs_inode);
1095                if (!inode) {
1096                        p = p->next;
1097                        continue;
1098                }
1099                session->s_cap_iterator = cap;
1100                spin_unlock(&session->s_cap_lock);
1101
1102                if (last_inode) {
1103                        iput(last_inode);
1104                        last_inode = NULL;
1105                }
1106                if (old_cap) {
1107                        ceph_put_cap(session->s_mdsc, old_cap);
1108                        old_cap = NULL;
1109                }
1110
1111                ret = cb(inode, cap, arg);
1112                last_inode = inode;
1113
1114                spin_lock(&session->s_cap_lock);
1115                p = p->next;
1116                if (cap->ci == NULL) {
1117                        dout("iterate_session_caps  finishing cap %p removal\n",
1118                             cap);
1119                        BUG_ON(cap->session != session);
1120                        cap->session = NULL;
1121                        list_del_init(&cap->session_caps);
1122                        session->s_nr_caps--;
1123                        if (cap->queue_release) {
1124                                list_add_tail(&cap->session_caps,
1125                                              &session->s_cap_releases);
1126                                session->s_num_cap_releases++;
1127                        } else {
1128                                old_cap = cap;  /* put_cap it w/o locks held */
1129                        }
1130                }
1131                if (ret < 0)
1132                        goto out;
1133        }
1134        ret = 0;
1135out:
1136        session->s_cap_iterator = NULL;
1137        spin_unlock(&session->s_cap_lock);
1138
1139        iput(last_inode);
1140        if (old_cap)
1141                ceph_put_cap(session->s_mdsc, old_cap);
1142
1143        return ret;
1144}
1145
1146static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1147                                  void *arg)
1148{
1149        struct ceph_inode_info *ci = ceph_inode(inode);
1150        LIST_HEAD(to_remove);
1151        int drop = 0;
1152
1153        dout("removing cap %p, ci is %p, inode is %p\n",
1154             cap, ci, &ci->vfs_inode);
1155        spin_lock(&ci->i_ceph_lock);
1156        __ceph_remove_cap(cap, false);
1157        if (!ci->i_auth_cap) {
1158                struct ceph_cap_flush *cf;
1159                struct ceph_mds_client *mdsc =
1160                        ceph_sb_to_client(inode->i_sb)->mdsc;
1161
1162                while (true) {
1163                        struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
1164                        if (!n)
1165                                break;
1166                        cf = rb_entry(n, struct ceph_cap_flush, i_node);
1167                        rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
1168                        list_add(&cf->list, &to_remove);
1169                }
1170
1171                spin_lock(&mdsc->cap_dirty_lock);
1172
1173                list_for_each_entry(cf, &to_remove, list)
1174                        rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
1175
1176                if (!list_empty(&ci->i_dirty_item)) {
1177                        pr_warn_ratelimited(
1178                                " dropping dirty %s state for %p %lld\n",
1179                                ceph_cap_string(ci->i_dirty_caps),
1180                                inode, ceph_ino(inode));
1181                        ci->i_dirty_caps = 0;
1182                        list_del_init(&ci->i_dirty_item);
1183                        drop = 1;
1184                }
1185                if (!list_empty(&ci->i_flushing_item)) {
1186                        pr_warn_ratelimited(
1187                                " dropping dirty+flushing %s state for %p %lld\n",
1188                                ceph_cap_string(ci->i_flushing_caps),
1189                                inode, ceph_ino(inode));
1190                        ci->i_flushing_caps = 0;
1191                        list_del_init(&ci->i_flushing_item);
1192                        mdsc->num_cap_flushing--;
1193                        drop = 1;
1194                }
1195                spin_unlock(&mdsc->cap_dirty_lock);
1196
1197                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1198                        list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
1199                        ci->i_prealloc_cap_flush = NULL;
1200                }
1201        }
1202        spin_unlock(&ci->i_ceph_lock);
1203        while (!list_empty(&to_remove)) {
1204                struct ceph_cap_flush *cf;
1205                cf = list_first_entry(&to_remove,
1206                                      struct ceph_cap_flush, list);
1207                list_del(&cf->list);
1208                ceph_free_cap_flush(cf);
1209        }
1210        while (drop--)
1211                iput(inode);
1212        return 0;
1213}
1214
1215/*
1216 * caller must hold session s_mutex
1217 */
1218static void remove_session_caps(struct ceph_mds_session *session)
1219{
1220        dout("remove_session_caps on %p\n", session);
1221        iterate_session_caps(session, remove_session_caps_cb, NULL);
1222
1223        spin_lock(&session->s_cap_lock);
1224        if (session->s_nr_caps > 0) {
1225                struct super_block *sb = session->s_mdsc->fsc->sb;
1226                struct inode *inode;
1227                struct ceph_cap *cap, *prev = NULL;
1228                struct ceph_vino vino;
1229                /*
1230                 * iterate_session_caps() skips inodes that are being
1231                 * deleted, we need to wait until deletions are complete.
1232                 * __wait_on_freeing_inode() is designed for the job,
1233                 * but it is not exported, so use lookup inode function
1234                 * to access it.
1235                 */
1236                while (!list_empty(&session->s_caps)) {
1237                        cap = list_entry(session->s_caps.next,
1238                                         struct ceph_cap, session_caps);
1239                        if (cap == prev)
1240                                break;
1241                        prev = cap;
1242                        vino = cap->ci->i_vino;
1243                        spin_unlock(&session->s_cap_lock);
1244
1245                        inode = ceph_find_inode(sb, vino);
1246                        iput(inode);
1247
1248                        spin_lock(&session->s_cap_lock);
1249                }
1250        }
1251
1252        // drop cap expires and unlock s_cap_lock
1253        cleanup_cap_releases(session->s_mdsc, session);
1254
1255        BUG_ON(session->s_nr_caps > 0);
1256        BUG_ON(!list_empty(&session->s_cap_flushing));
1257}
1258
1259/*
1260 * wake up any threads waiting on this session's caps.  if the cap is
1261 * old (didn't get renewed on the client reconnect), remove it now.
1262 *
1263 * caller must hold s_mutex.
1264 */
1265static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1266                              void *arg)
1267{
1268        struct ceph_inode_info *ci = ceph_inode(inode);
1269
1270        wake_up_all(&ci->i_cap_wq);
1271        if (arg) {
1272                spin_lock(&ci->i_ceph_lock);
1273                ci->i_wanted_max_size = 0;
1274                ci->i_requested_max_size = 0;
1275                spin_unlock(&ci->i_ceph_lock);
1276        }
1277        return 0;
1278}
1279
1280static void wake_up_session_caps(struct ceph_mds_session *session,
1281                                 int reconnect)
1282{
1283        dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1284        iterate_session_caps(session, wake_up_session_cb,
1285                             (void *)(unsigned long)reconnect);
1286}
1287
1288/*
1289 * Send periodic message to MDS renewing all currently held caps.  The
1290 * ack will reset the expiration for all caps from this session.
1291 *
1292 * caller holds s_mutex
1293 */
1294static int send_renew_caps(struct ceph_mds_client *mdsc,
1295                           struct ceph_mds_session *session)
1296{
1297        struct ceph_msg *msg;
1298        int state;
1299
1300        if (time_after_eq(jiffies, session->s_cap_ttl) &&
1301            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1302                pr_info("mds%d caps stale\n", session->s_mds);
1303        session->s_renew_requested = jiffies;
1304
1305        /* do not try to renew caps until a recovering mds has reconnected
1306         * with its clients. */
1307        state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1308        if (state < CEPH_MDS_STATE_RECONNECT) {
1309                dout("send_renew_caps ignoring mds%d (%s)\n",
1310                     session->s_mds, ceph_mds_state_name(state));
1311                return 0;
1312        }
1313
1314        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1315                ceph_mds_state_name(state));
1316        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1317                                 ++session->s_renew_seq);
1318        if (!msg)
1319                return -ENOMEM;
1320        ceph_con_send(&session->s_con, msg);
1321        return 0;
1322}
1323
1324static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1325                             struct ceph_mds_session *session, u64 seq)
1326{
1327        struct ceph_msg *msg;
1328
1329        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1330             session->s_mds, ceph_session_state_name(session->s_state), seq);
1331        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1332        if (!msg)
1333                return -ENOMEM;
1334        ceph_con_send(&session->s_con, msg);
1335        return 0;
1336}
1337
1338
1339/*
1340 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1341 *
1342 * Called under session->s_mutex
1343 */
1344static void renewed_caps(struct ceph_mds_client *mdsc,
1345                         struct ceph_mds_session *session, int is_renew)
1346{
1347        int was_stale;
1348        int wake = 0;
1349
1350        spin_lock(&session->s_cap_lock);
1351        was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1352
1353        session->s_cap_ttl = session->s_renew_requested +
1354                mdsc->mdsmap->m_session_timeout*HZ;
1355
1356        if (was_stale) {
1357                if (time_before(jiffies, session->s_cap_ttl)) {
1358                        pr_info("mds%d caps renewed\n", session->s_mds);
1359                        wake = 1;
1360                } else {
1361                        pr_info("mds%d caps still stale\n", session->s_mds);
1362                }
1363        }
1364        dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1365             session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1366             time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1367        spin_unlock(&session->s_cap_lock);
1368
1369        if (wake)
1370                wake_up_session_caps(session, 0);
1371}
1372
1373/*
1374 * send a session close request
1375 */
1376static int request_close_session(struct ceph_mds_client *mdsc,
1377                                 struct ceph_mds_session *session)
1378{
1379        struct ceph_msg *msg;
1380
1381        dout("request_close_session mds%d state %s seq %lld\n",
1382             session->s_mds, ceph_session_state_name(session->s_state),
1383             session->s_seq);
1384        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1385        if (!msg)
1386                return -ENOMEM;
1387        ceph_con_send(&session->s_con, msg);
1388        return 0;
1389}
1390
1391/*
1392 * Called with s_mutex held.
1393 */
1394static int __close_session(struct ceph_mds_client *mdsc,
1395                         struct ceph_mds_session *session)
1396{
1397        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1398                return 0;
1399        session->s_state = CEPH_MDS_SESSION_CLOSING;
1400        return request_close_session(mdsc, session);
1401}
1402
1403/*
1404 * Trim old(er) caps.
1405 *
1406 * Because we can't cache an inode without one or more caps, we do
1407 * this indirectly: if a cap is unused, we prune its aliases, at which
1408 * point the inode will hopefully get dropped to.
1409 *
1410 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1411 * memory pressure from the MDS, though, so it needn't be perfect.
1412 */
1413static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1414{
1415        struct ceph_mds_session *session = arg;
1416        struct ceph_inode_info *ci = ceph_inode(inode);
1417        int used, wanted, oissued, mine;
1418
1419        if (session->s_trim_caps <= 0)
1420                return -1;
1421
1422        spin_lock(&ci->i_ceph_lock);
1423        mine = cap->issued | cap->implemented;
1424        used = __ceph_caps_used(ci);
1425        wanted = __ceph_caps_file_wanted(ci);
1426        oissued = __ceph_caps_issued_other(ci, cap);
1427
1428        dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1429             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1430             ceph_cap_string(used), ceph_cap_string(wanted));
1431        if (cap == ci->i_auth_cap) {
1432                if (ci->i_dirty_caps || ci->i_flushing_caps ||
1433                    !list_empty(&ci->i_cap_snaps))
1434                        goto out;
1435                if ((used | wanted) & CEPH_CAP_ANY_WR)
1436                        goto out;
1437        }
1438        /* The inode has cached pages, but it's no longer used.
1439         * we can safely drop it */
1440        if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1441            !(oissued & CEPH_CAP_FILE_CACHE)) {
1442          used = 0;
1443          oissued = 0;
1444        }
1445        if ((used | wanted) & ~oissued & mine)
1446                goto out;   /* we need these caps */
1447
1448        session->s_trim_caps--;
1449        if (oissued) {
1450                /* we aren't the only cap.. just remove us */
1451                __ceph_remove_cap(cap, true);
1452        } else {
1453                /* try dropping referring dentries */
1454                spin_unlock(&ci->i_ceph_lock);
1455                d_prune_aliases(inode);
1456                dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1457                     inode, cap, atomic_read(&inode->i_count));
1458                return 0;
1459        }
1460
1461out:
1462        spin_unlock(&ci->i_ceph_lock);
1463        return 0;
1464}
1465
1466/*
1467 * Trim session cap count down to some max number.
1468 */
1469static int trim_caps(struct ceph_mds_client *mdsc,
1470                     struct ceph_mds_session *session,
1471                     int max_caps)
1472{
1473        int trim_caps = session->s_nr_caps - max_caps;
1474
1475        dout("trim_caps mds%d start: %d / %d, trim %d\n",
1476             session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1477        if (trim_caps > 0) {
1478                session->s_trim_caps = trim_caps;
1479                iterate_session_caps(session, trim_caps_cb, session);
1480                dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1481                     session->s_mds, session->s_nr_caps, max_caps,
1482                        trim_caps - session->s_trim_caps);
1483                session->s_trim_caps = 0;
1484        }
1485
1486        ceph_send_cap_releases(mdsc, session);
1487        return 0;
1488}
1489
1490static int check_capsnap_flush(struct ceph_inode_info *ci,
1491                               u64 want_snap_seq)
1492{
1493        int ret = 1;
1494        spin_lock(&ci->i_ceph_lock);
1495        if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
1496                struct ceph_cap_snap *capsnap =
1497                        list_first_entry(&ci->i_cap_snaps,
1498                                         struct ceph_cap_snap, ci_item);
1499                ret = capsnap->follows >= want_snap_seq;
1500        }
1501        spin_unlock(&ci->i_ceph_lock);
1502        return ret;
1503}
1504
1505static int check_caps_flush(struct ceph_mds_client *mdsc,
1506                            u64 want_flush_tid)
1507{
1508        struct rb_node *n;
1509        struct ceph_cap_flush *cf;
1510        int ret = 1;
1511
1512        spin_lock(&mdsc->cap_dirty_lock);
1513        n = rb_first(&mdsc->cap_flush_tree);
1514        cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
1515        if (cf && cf->tid <= want_flush_tid) {
1516                dout("check_caps_flush still flushing tid %llu <= %llu\n",
1517                     cf->tid, want_flush_tid);
1518                ret = 0;
1519        }
1520        spin_unlock(&mdsc->cap_dirty_lock);
1521        return ret;
1522}
1523
1524/*
1525 * flush all dirty inode data to disk.
1526 *
1527 * returns true if we've flushed through want_flush_tid
1528 */
1529static void wait_caps_flush(struct ceph_mds_client *mdsc,
1530                            u64 want_flush_tid, u64 want_snap_seq)
1531{
1532        int mds;
1533
1534        dout("check_caps_flush want %llu snap want %llu\n",
1535             want_flush_tid, want_snap_seq);
1536        mutex_lock(&mdsc->mutex);
1537        for (mds = 0; mds < mdsc->max_sessions; ) {
1538                struct ceph_mds_session *session = mdsc->sessions[mds];
1539                struct inode *inode = NULL;
1540
1541                if (!session) {
1542                        mds++;
1543                        continue;
1544                }
1545                get_session(session);
1546                mutex_unlock(&mdsc->mutex);
1547
1548                mutex_lock(&session->s_mutex);
1549                if (!list_empty(&session->s_cap_snaps_flushing)) {
1550                        struct ceph_cap_snap *capsnap =
1551                                list_first_entry(&session->s_cap_snaps_flushing,
1552                                                 struct ceph_cap_snap,
1553                                                 flushing_item);
1554                        struct ceph_inode_info *ci = capsnap->ci;
1555                        if (!check_capsnap_flush(ci, want_snap_seq)) {
1556                                dout("check_cap_flush still flushing snap %p "
1557                                     "follows %lld <= %lld to mds%d\n",
1558                                     &ci->vfs_inode, capsnap->follows,
1559                                     want_snap_seq, mds);
1560                                inode = igrab(&ci->vfs_inode);
1561                        }
1562                }
1563                mutex_unlock(&session->s_mutex);
1564                ceph_put_mds_session(session);
1565
1566                if (inode) {
1567                        wait_event(mdsc->cap_flushing_wq,
1568                                   check_capsnap_flush(ceph_inode(inode),
1569                                                       want_snap_seq));
1570                        iput(inode);
1571                } else {
1572                        mds++;
1573                }
1574
1575                mutex_lock(&mdsc->mutex);
1576        }
1577        mutex_unlock(&mdsc->mutex);
1578
1579        wait_event(mdsc->cap_flushing_wq,
1580                   check_caps_flush(mdsc, want_flush_tid));
1581
1582        dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1583}
1584
1585/*
1586 * called under s_mutex
1587 */
1588void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1589                            struct ceph_mds_session *session)
1590{
1591        struct ceph_msg *msg = NULL;
1592        struct ceph_mds_cap_release *head;
1593        struct ceph_mds_cap_item *item;
1594        struct ceph_cap *cap;
1595        LIST_HEAD(tmp_list);
1596        int num_cap_releases;
1597
1598        spin_lock(&session->s_cap_lock);
1599again:
1600        list_splice_init(&session->s_cap_releases, &tmp_list);
1601        num_cap_releases = session->s_num_cap_releases;
1602        session->s_num_cap_releases = 0;
1603        spin_unlock(&session->s_cap_lock);
1604
1605        while (!list_empty(&tmp_list)) {
1606                if (!msg) {
1607                        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1608                                        PAGE_CACHE_SIZE, GFP_NOFS, false);
1609                        if (!msg)
1610                                goto out_err;
1611                        head = msg->front.iov_base;
1612                        head->num = cpu_to_le32(0);
1613                        msg->front.iov_len = sizeof(*head);
1614                }
1615                cap = list_first_entry(&tmp_list, struct ceph_cap,
1616                                        session_caps);
1617                list_del(&cap->session_caps);
1618                num_cap_releases--;
1619
1620                head = msg->front.iov_base;
1621                le32_add_cpu(&head->num, 1);
1622                item = msg->front.iov_base + msg->front.iov_len;
1623                item->ino = cpu_to_le64(cap->cap_ino);
1624                item->cap_id = cpu_to_le64(cap->cap_id);
1625                item->migrate_seq = cpu_to_le32(cap->mseq);
1626                item->seq = cpu_to_le32(cap->issue_seq);
1627                msg->front.iov_len += sizeof(*item);
1628
1629                ceph_put_cap(mdsc, cap);
1630
1631                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1632                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1633                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1634                        ceph_con_send(&session->s_con, msg);
1635                        msg = NULL;
1636                }
1637        }
1638
1639        BUG_ON(num_cap_releases != 0);
1640
1641        spin_lock(&session->s_cap_lock);
1642        if (!list_empty(&session->s_cap_releases))
1643                goto again;
1644        spin_unlock(&session->s_cap_lock);
1645
1646        if (msg) {
1647                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1648                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1649                ceph_con_send(&session->s_con, msg);
1650        }
1651        return;
1652out_err:
1653        pr_err("send_cap_releases mds%d, failed to allocate message\n",
1654                session->s_mds);
1655        spin_lock(&session->s_cap_lock);
1656        list_splice(&tmp_list, &session->s_cap_releases);
1657        session->s_num_cap_releases += num_cap_releases;
1658        spin_unlock(&session->s_cap_lock);
1659}
1660
1661/*
1662 * requests
1663 */
1664
1665int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1666                                    struct inode *dir)
1667{
1668        struct ceph_inode_info *ci = ceph_inode(dir);
1669        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1670        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1671        size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
1672                      sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
1673        int order, num_entries;
1674
1675        spin_lock(&ci->i_ceph_lock);
1676        num_entries = ci->i_files + ci->i_subdirs;
1677        spin_unlock(&ci->i_ceph_lock);
1678        num_entries = max(num_entries, 1);
1679        num_entries = min(num_entries, opt->max_readdir);
1680
1681        order = get_order(size * num_entries);
1682        while (order >= 0) {
1683                rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
1684                                                        __GFP_NOWARN,
1685                                                        order);
1686                if (rinfo->dir_in)
1687                        break;
1688                order--;
1689        }
1690        if (!rinfo->dir_in)
1691                return -ENOMEM;
1692
1693        num_entries = (PAGE_SIZE << order) / size;
1694        num_entries = min(num_entries, opt->max_readdir);
1695
1696        rinfo->dir_buf_size = PAGE_SIZE << order;
1697        req->r_num_caps = num_entries + 1;
1698        req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1699        req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1700        return 0;
1701}
1702
1703/*
1704 * Create an mds request.
1705 */
1706struct ceph_mds_request *
1707ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1708{
1709        struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1710
1711        if (!req)
1712                return ERR_PTR(-ENOMEM);
1713
1714        mutex_init(&req->r_fill_mutex);
1715        req->r_mdsc = mdsc;
1716        req->r_started = jiffies;
1717        req->r_resend_mds = -1;
1718        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1719        INIT_LIST_HEAD(&req->r_unsafe_target_item);
1720        req->r_fmode = -1;
1721        kref_init(&req->r_kref);
1722        INIT_LIST_HEAD(&req->r_wait);
1723        init_completion(&req->r_completion);
1724        init_completion(&req->r_safe_completion);
1725        INIT_LIST_HEAD(&req->r_unsafe_item);
1726
1727        req->r_stamp = CURRENT_TIME;
1728
1729        req->r_op = op;
1730        req->r_direct_mode = mode;
1731        return req;
1732}
1733
1734/*
1735 * return oldest (lowest) request, tid in request tree, 0 if none.
1736 *
1737 * called under mdsc->mutex.
1738 */
1739static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1740{
1741        if (RB_EMPTY_ROOT(&mdsc->request_tree))
1742                return NULL;
1743        return rb_entry(rb_first(&mdsc->request_tree),
1744                        struct ceph_mds_request, r_node);
1745}
1746
1747static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1748{
1749        return mdsc->oldest_tid;
1750}
1751
1752/*
1753 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1754 * on build_path_from_dentry in fs/cifs/dir.c.
1755 *
1756 * If @stop_on_nosnap, generate path relative to the first non-snapped
1757 * inode.
1758 *
1759 * Encode hidden .snap dirs as a double /, i.e.
1760 *   foo/.snap/bar -> foo//bar
1761 */
1762char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1763                           int stop_on_nosnap)
1764{
1765        struct dentry *temp;
1766        char *path;
1767        int len, pos;
1768        unsigned seq;
1769
1770        if (dentry == NULL)
1771                return ERR_PTR(-EINVAL);
1772
1773retry:
1774        len = 0;
1775        seq = read_seqbegin(&rename_lock);
1776        rcu_read_lock();
1777        for (temp = dentry; !IS_ROOT(temp);) {
1778                struct inode *inode = d_inode(temp);
1779                if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1780                        len++;  /* slash only */
1781                else if (stop_on_nosnap && inode &&
1782                         ceph_snap(inode) == CEPH_NOSNAP)
1783                        break;
1784                else
1785                        len += 1 + temp->d_name.len;
1786                temp = temp->d_parent;
1787        }
1788        rcu_read_unlock();
1789        if (len)
1790                len--;  /* no leading '/' */
1791
1792        path = kmalloc(len+1, GFP_NOFS);
1793        if (path == NULL)
1794                return ERR_PTR(-ENOMEM);
1795        pos = len;
1796        path[pos] = 0;  /* trailing null */
1797        rcu_read_lock();
1798        for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1799                struct inode *inode;
1800
1801                spin_lock(&temp->d_lock);
1802                inode = d_inode(temp);
1803                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1804                        dout("build_path path+%d: %p SNAPDIR\n",
1805                             pos, temp);
1806                } else if (stop_on_nosnap && inode &&
1807                           ceph_snap(inode) == CEPH_NOSNAP) {
1808                        spin_unlock(&temp->d_lock);
1809                        break;
1810                } else {
1811                        pos -= temp->d_name.len;
1812                        if (pos < 0) {
1813                                spin_unlock(&temp->d_lock);
1814                                break;
1815                        }
1816                        strncpy(path + pos, temp->d_name.name,
1817                                temp->d_name.len);
1818                }
1819                spin_unlock(&temp->d_lock);
1820                if (pos)
1821                        path[--pos] = '/';
1822                temp = temp->d_parent;
1823        }
1824        rcu_read_unlock();
1825        if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1826                pr_err("build_path did not end path lookup where "
1827                       "expected, namelen is %d, pos is %d\n", len, pos);
1828                /* presumably this is only possible if racing with a
1829                   rename of one of the parent directories (we can not
1830                   lock the dentries above us to prevent this, but
1831                   retrying should be harmless) */
1832                kfree(path);
1833                goto retry;
1834        }
1835
1836        *base = ceph_ino(d_inode(temp));
1837        *plen = len;
1838        dout("build_path on %p %d built %llx '%.*s'\n",
1839             dentry, d_count(dentry), *base, len, path);
1840        return path;
1841}
1842
1843static int build_dentry_path(struct dentry *dentry,
1844                             const char **ppath, int *ppathlen, u64 *pino,
1845                             int *pfreepath)
1846{
1847        char *path;
1848
1849        if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) {
1850                *pino = ceph_ino(d_inode(dentry->d_parent));
1851                *ppath = dentry->d_name.name;
1852                *ppathlen = dentry->d_name.len;
1853                return 0;
1854        }
1855        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1856        if (IS_ERR(path))
1857                return PTR_ERR(path);
1858        *ppath = path;
1859        *pfreepath = 1;
1860        return 0;
1861}
1862
1863static int build_inode_path(struct inode *inode,
1864                            const char **ppath, int *ppathlen, u64 *pino,
1865                            int *pfreepath)
1866{
1867        struct dentry *dentry;
1868        char *path;
1869
1870        if (ceph_snap(inode) == CEPH_NOSNAP) {
1871                *pino = ceph_ino(inode);
1872                *ppathlen = 0;
1873                return 0;
1874        }
1875        dentry = d_find_alias(inode);
1876        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1877        dput(dentry);
1878        if (IS_ERR(path))
1879                return PTR_ERR(path);
1880        *ppath = path;
1881        *pfreepath = 1;
1882        return 0;
1883}
1884
1885/*
1886 * request arguments may be specified via an inode *, a dentry *, or
1887 * an explicit ino+path.
1888 */
1889static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1890                                  const char *rpath, u64 rino,
1891                                  const char **ppath, int *pathlen,
1892                                  u64 *ino, int *freepath)
1893{
1894        int r = 0;
1895
1896        if (rinode) {
1897                r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1898                dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1899                     ceph_snap(rinode));
1900        } else if (rdentry) {
1901                r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1902                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1903                     *ppath);
1904        } else if (rpath || rino) {
1905                *ino = rino;
1906                *ppath = rpath;
1907                *pathlen = rpath ? strlen(rpath) : 0;
1908                dout(" path %.*s\n", *pathlen, rpath);
1909        }
1910
1911        return r;
1912}
1913
1914/*
1915 * called under mdsc->mutex
1916 */
1917static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1918                                               struct ceph_mds_request *req,
1919                                               int mds, bool drop_cap_releases)
1920{
1921        struct ceph_msg *msg;
1922        struct ceph_mds_request_head *head;
1923        const char *path1 = NULL;
1924        const char *path2 = NULL;
1925        u64 ino1 = 0, ino2 = 0;
1926        int pathlen1 = 0, pathlen2 = 0;
1927        int freepath1 = 0, freepath2 = 0;
1928        int len;
1929        u16 releases;
1930        void *p, *end;
1931        int ret;
1932
1933        ret = set_request_path_attr(req->r_inode, req->r_dentry,
1934                              req->r_path1, req->r_ino1.ino,
1935                              &path1, &pathlen1, &ino1, &freepath1);
1936        if (ret < 0) {
1937                msg = ERR_PTR(ret);
1938                goto out;
1939        }
1940
1941        ret = set_request_path_attr(NULL, req->r_old_dentry,
1942                              req->r_path2, req->r_ino2.ino,
1943                              &path2, &pathlen2, &ino2, &freepath2);
1944        if (ret < 0) {
1945                msg = ERR_PTR(ret);
1946                goto out_free1;
1947        }
1948
1949        len = sizeof(*head) +
1950                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1951                sizeof(struct ceph_timespec);
1952
1953        /* calculate (max) length for cap releases */
1954        len += sizeof(struct ceph_mds_request_release) *
1955                (!!req->r_inode_drop + !!req->r_dentry_drop +
1956                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1957        if (req->r_dentry_drop)
1958                len += req->r_dentry->d_name.len;
1959        if (req->r_old_dentry_drop)
1960                len += req->r_old_dentry->d_name.len;
1961
1962        msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1963        if (!msg) {
1964                msg = ERR_PTR(-ENOMEM);
1965                goto out_free2;
1966        }
1967
1968        msg->hdr.version = cpu_to_le16(2);
1969        msg->hdr.tid = cpu_to_le64(req->r_tid);
1970
1971        head = msg->front.iov_base;
1972        p = msg->front.iov_base + sizeof(*head);
1973        end = msg->front.iov_base + msg->front.iov_len;
1974
1975        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1976        head->op = cpu_to_le32(req->r_op);
1977        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1978        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1979        head->args = req->r_args;
1980
1981        ceph_encode_filepath(&p, end, ino1, path1);
1982        ceph_encode_filepath(&p, end, ino2, path2);
1983
1984        /* make note of release offset, in case we need to replay */
1985        req->r_request_release_offset = p - msg->front.iov_base;
1986
1987        /* cap releases */
1988        releases = 0;
1989        if (req->r_inode_drop)
1990                releases += ceph_encode_inode_release(&p,
1991                      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
1992                      mds, req->r_inode_drop, req->r_inode_unless, 0);
1993        if (req->r_dentry_drop)
1994                releases += ceph_encode_dentry_release(&p, req->r_dentry,
1995                       mds, req->r_dentry_drop, req->r_dentry_unless);
1996        if (req->r_old_dentry_drop)
1997                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1998                       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1999        if (req->r_old_inode_drop)
2000                releases += ceph_encode_inode_release(&p,
2001                      d_inode(req->r_old_dentry),
2002                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2003
2004        if (drop_cap_releases) {
2005                releases = 0;
2006                p = msg->front.iov_base + req->r_request_release_offset;
2007        }
2008
2009        head->num_releases = cpu_to_le16(releases);
2010
2011        /* time stamp */
2012        {
2013                struct ceph_timespec ts;
2014                ceph_encode_timespec(&ts, &req->r_stamp);
2015                ceph_encode_copy(&p, &ts, sizeof(ts));
2016        }
2017
2018        BUG_ON(p > end);
2019        msg->front.iov_len = p - msg->front.iov_base;
2020        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2021
2022        if (req->r_pagelist) {
2023                struct ceph_pagelist *pagelist = req->r_pagelist;
2024                atomic_inc(&pagelist->refcnt);
2025                ceph_msg_data_add_pagelist(msg, pagelist);
2026                msg->hdr.data_len = cpu_to_le32(pagelist->length);
2027        } else {
2028                msg->hdr.data_len = 0;
2029        }
2030
2031        msg->hdr.data_off = cpu_to_le16(0);
2032
2033out_free2:
2034        if (freepath2)
2035                kfree((char *)path2);
2036out_free1:
2037        if (freepath1)
2038                kfree((char *)path1);
2039out:
2040        return msg;
2041}
2042
2043/*
2044 * called under mdsc->mutex if error, under no mutex if
2045 * success.
2046 */
2047static void complete_request(struct ceph_mds_client *mdsc,
2048                             struct ceph_mds_request *req)
2049{
2050        if (req->r_callback)
2051                req->r_callback(mdsc, req);
2052        else
2053                complete_all(&req->r_completion);
2054}
2055
2056/*
2057 * called under mdsc->mutex
2058 */
2059static int __prepare_send_request(struct ceph_mds_client *mdsc,
2060                                  struct ceph_mds_request *req,
2061                                  int mds, bool drop_cap_releases)
2062{
2063        struct ceph_mds_request_head *rhead;
2064        struct ceph_msg *msg;
2065        int flags = 0;
2066
2067        req->r_attempts++;
2068        if (req->r_inode) {
2069                struct ceph_cap *cap =
2070                        ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2071
2072                if (cap)
2073                        req->r_sent_on_mseq = cap->mseq;
2074                else
2075                        req->r_sent_on_mseq = -1;
2076        }
2077        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2078             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2079
2080        if (req->r_got_unsafe) {
2081                void *p;
2082                /*
2083                 * Replay.  Do not regenerate message (and rebuild
2084                 * paths, etc.); just use the original message.
2085                 * Rebuilding paths will break for renames because
2086                 * d_move mangles the src name.
2087                 */
2088                msg = req->r_request;
2089                rhead = msg->front.iov_base;
2090
2091                flags = le32_to_cpu(rhead->flags);
2092                flags |= CEPH_MDS_FLAG_REPLAY;
2093                rhead->flags = cpu_to_le32(flags);
2094
2095                if (req->r_target_inode)
2096                        rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2097
2098                rhead->num_retry = req->r_attempts - 1;
2099
2100                /* remove cap/dentry releases from message */
2101                rhead->num_releases = 0;
2102
2103                /* time stamp */
2104                p = msg->front.iov_base + req->r_request_release_offset;
2105                {
2106                        struct ceph_timespec ts;
2107                        ceph_encode_timespec(&ts, &req->r_stamp);
2108                        ceph_encode_copy(&p, &ts, sizeof(ts));
2109                }
2110
2111                msg->front.iov_len = p - msg->front.iov_base;
2112                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2113                return 0;
2114        }
2115
2116        if (req->r_request) {
2117                ceph_msg_put(req->r_request);
2118                req->r_request = NULL;
2119        }
2120        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2121        if (IS_ERR(msg)) {
2122                req->r_err = PTR_ERR(msg);
2123                return PTR_ERR(msg);
2124        }
2125        req->r_request = msg;
2126
2127        rhead = msg->front.iov_base;
2128        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2129        if (req->r_got_unsafe)
2130                flags |= CEPH_MDS_FLAG_REPLAY;
2131        if (req->r_locked_dir)
2132                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2133        rhead->flags = cpu_to_le32(flags);
2134        rhead->num_fwd = req->r_num_fwd;
2135        rhead->num_retry = req->r_attempts - 1;
2136        rhead->ino = 0;
2137
2138        dout(" r_locked_dir = %p\n", req->r_locked_dir);
2139        return 0;
2140}
2141
2142/*
2143 * send request, or put it on the appropriate wait list.
2144 */
2145static int __do_request(struct ceph_mds_client *mdsc,
2146                        struct ceph_mds_request *req)
2147{
2148        struct ceph_mds_session *session = NULL;
2149        int mds = -1;
2150        int err = 0;
2151
2152        if (req->r_err || req->r_got_result) {
2153                if (req->r_aborted)
2154                        __unregister_request(mdsc, req);
2155                goto out;
2156        }
2157
2158        if (req->r_timeout &&
2159            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2160                dout("do_request timed out\n");
2161                err = -EIO;
2162                goto finish;
2163        }
2164        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2165                dout("do_request forced umount\n");
2166                err = -EIO;
2167                goto finish;
2168        }
2169
2170        put_request_session(req);
2171
2172        mds = __choose_mds(mdsc, req);
2173        if (mds < 0 ||
2174            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2175                dout("do_request no mds or not active, waiting for map\n");
2176                list_add(&req->r_wait, &mdsc->waiting_for_map);
2177                goto out;
2178        }
2179
2180        /* get, open session */
2181        session = __ceph_lookup_mds_session(mdsc, mds);
2182        if (!session) {
2183                session = register_session(mdsc, mds);
2184                if (IS_ERR(session)) {
2185                        err = PTR_ERR(session);
2186                        goto finish;
2187                }
2188        }
2189        req->r_session = get_session(session);
2190
2191        dout("do_request mds%d session %p state %s\n", mds, session,
2192             ceph_session_state_name(session->s_state));
2193        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2194            session->s_state != CEPH_MDS_SESSION_HUNG) {
2195                if (session->s_state == CEPH_MDS_SESSION_NEW ||
2196                    session->s_state == CEPH_MDS_SESSION_CLOSING)
2197                        __open_session(mdsc, session);
2198                list_add(&req->r_wait, &session->s_waiting);
2199                goto out_session;
2200        }
2201
2202        /* send request */
2203        req->r_resend_mds = -1;   /* forget any previous mds hint */
2204
2205        if (req->r_request_started == 0)   /* note request start time */
2206                req->r_request_started = jiffies;
2207
2208        err = __prepare_send_request(mdsc, req, mds, false);
2209        if (!err) {
2210                ceph_msg_get(req->r_request);
2211                ceph_con_send(&session->s_con, req->r_request);
2212        }
2213
2214out_session:
2215        ceph_put_mds_session(session);
2216finish:
2217        if (err) {
2218                dout("__do_request early error %d\n", err);
2219                req->r_err = err;
2220                complete_request(mdsc, req);
2221                __unregister_request(mdsc, req);
2222        }
2223out:
2224        return err;
2225}
2226
2227/*
2228 * called under mdsc->mutex
2229 */
2230static void __wake_requests(struct ceph_mds_client *mdsc,
2231                            struct list_head *head)
2232{
2233        struct ceph_mds_request *req;
2234        LIST_HEAD(tmp_list);
2235
2236        list_splice_init(head, &tmp_list);
2237
2238        while (!list_empty(&tmp_list)) {
2239                req = list_entry(tmp_list.next,
2240                                 struct ceph_mds_request, r_wait);
2241                list_del_init(&req->r_wait);
2242                dout(" wake request %p tid %llu\n", req, req->r_tid);
2243                __do_request(mdsc, req);
2244        }
2245}
2246
2247/*
2248 * Wake up threads with requests pending for @mds, so that they can
2249 * resubmit their requests to a possibly different mds.
2250 */
2251static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2252{
2253        struct ceph_mds_request *req;
2254        struct rb_node *p = rb_first(&mdsc->request_tree);
2255
2256        dout("kick_requests mds%d\n", mds);
2257        while (p) {
2258                req = rb_entry(p, struct ceph_mds_request, r_node);
2259                p = rb_next(p);
2260                if (req->r_got_unsafe)
2261                        continue;
2262                if (req->r_attempts > 0)
2263                        continue; /* only new requests */
2264                if (req->r_session &&
2265                    req->r_session->s_mds == mds) {
2266                        dout(" kicking tid %llu\n", req->r_tid);
2267                        list_del_init(&req->r_wait);
2268                        __do_request(mdsc, req);
2269                }
2270        }
2271}
2272
2273void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2274                              struct ceph_mds_request *req)
2275{
2276        dout("submit_request on %p\n", req);
2277        mutex_lock(&mdsc->mutex);
2278        __register_request(mdsc, req, NULL);
2279        __do_request(mdsc, req);
2280        mutex_unlock(&mdsc->mutex);
2281}
2282
2283/*
2284 * Synchrously perform an mds request.  Take care of all of the
2285 * session setup, forwarding, retry details.
2286 */
2287int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2288                         struct inode *dir,
2289                         struct ceph_mds_request *req)
2290{
2291        int err;
2292
2293        dout("do_request on %p\n", req);
2294
2295        /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
2296        if (req->r_inode)
2297                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2298        if (req->r_locked_dir)
2299                ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
2300        if (req->r_old_dentry_dir)
2301                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2302                                  CEPH_CAP_PIN);
2303
2304        /* issue */
2305        mutex_lock(&mdsc->mutex);
2306        __register_request(mdsc, req, dir);
2307        __do_request(mdsc, req);
2308
2309        if (req->r_err) {
2310                err = req->r_err;
2311                goto out;
2312        }
2313
2314        /* wait */
2315        mutex_unlock(&mdsc->mutex);
2316        dout("do_request waiting\n");
2317        if (!req->r_timeout && req->r_wait_for_completion) {
2318                err = req->r_wait_for_completion(mdsc, req);
2319        } else {
2320                long timeleft = wait_for_completion_killable_timeout(
2321                                        &req->r_completion,
2322                                        ceph_timeout_jiffies(req->r_timeout));
2323                if (timeleft > 0)
2324                        err = 0;
2325                else if (!timeleft)
2326                        err = -EIO;  /* timed out */
2327                else
2328                        err = timeleft;  /* killed */
2329        }
2330        dout("do_request waited, got %d\n", err);
2331        mutex_lock(&mdsc->mutex);
2332
2333        /* only abort if we didn't race with a real reply */
2334        if (req->r_got_result) {
2335                err = le32_to_cpu(req->r_reply_info.head->result);
2336        } else if (err < 0) {
2337                dout("aborted request %lld with %d\n", req->r_tid, err);
2338
2339                /*
2340                 * ensure we aren't running concurrently with
2341                 * ceph_fill_trace or ceph_readdir_prepopulate, which
2342                 * rely on locks (dir mutex) held by our caller.
2343                 */
2344                mutex_lock(&req->r_fill_mutex);
2345                req->r_err = err;
2346                req->r_aborted = true;
2347                mutex_unlock(&req->r_fill_mutex);
2348
2349                if (req->r_locked_dir &&
2350                    (req->r_op & CEPH_MDS_OP_WRITE))
2351                        ceph_invalidate_dir_request(req);
2352        } else {
2353                err = req->r_err;
2354        }
2355
2356out:
2357        mutex_unlock(&mdsc->mutex);
2358        dout("do_request %p done, result %d\n", req, err);
2359        return err;
2360}
2361
2362/*
2363 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2364 * namespace request.
2365 */
2366void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2367{
2368        struct inode *inode = req->r_locked_dir;
2369
2370        dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2371
2372        ceph_dir_clear_complete(inode);
2373        if (req->r_dentry)
2374                ceph_invalidate_dentry_lease(req->r_dentry);
2375        if (req->r_old_dentry)
2376                ceph_invalidate_dentry_lease(req->r_old_dentry);
2377}
2378
2379/*
2380 * Handle mds reply.
2381 *
2382 * We take the session mutex and parse and process the reply immediately.
2383 * This preserves the logical ordering of replies, capabilities, etc., sent
2384 * by the MDS as they are applied to our local cache.
2385 */
2386static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2387{
2388        struct ceph_mds_client *mdsc = session->s_mdsc;
2389        struct ceph_mds_request *req;
2390        struct ceph_mds_reply_head *head = msg->front.iov_base;
2391        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2392        struct ceph_snap_realm *realm;
2393        u64 tid;
2394        int err, result;
2395        int mds = session->s_mds;
2396
2397        if (msg->front.iov_len < sizeof(*head)) {
2398                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2399                ceph_msg_dump(msg);
2400                return;
2401        }
2402
2403        /* get request, session */
2404        tid = le64_to_cpu(msg->hdr.tid);
2405        mutex_lock(&mdsc->mutex);
2406        req = __lookup_request(mdsc, tid);
2407        if (!req) {
2408                dout("handle_reply on unknown tid %llu\n", tid);
2409                mutex_unlock(&mdsc->mutex);
2410                return;
2411        }
2412        dout("handle_reply %p\n", req);
2413
2414        /* correct session? */
2415        if (req->r_session != session) {
2416                pr_err("mdsc_handle_reply got %llu on session mds%d"
2417                       " not mds%d\n", tid, session->s_mds,
2418                       req->r_session ? req->r_session->s_mds : -1);
2419                mutex_unlock(&mdsc->mutex);
2420                goto out;
2421        }
2422
2423        /* dup? */
2424        if ((req->r_got_unsafe && !head->safe) ||
2425            (req->r_got_safe && head->safe)) {
2426                pr_warn("got a dup %s reply on %llu from mds%d\n",
2427                           head->safe ? "safe" : "unsafe", tid, mds);
2428                mutex_unlock(&mdsc->mutex);
2429                goto out;
2430        }
2431        if (req->r_got_safe) {
2432                pr_warn("got unsafe after safe on %llu from mds%d\n",
2433                           tid, mds);
2434                mutex_unlock(&mdsc->mutex);
2435                goto out;
2436        }
2437
2438        result = le32_to_cpu(head->result);
2439
2440        /*
2441         * Handle an ESTALE
2442         * if we're not talking to the authority, send to them
2443         * if the authority has changed while we weren't looking,
2444         * send to new authority
2445         * Otherwise we just have to return an ESTALE
2446         */
2447        if (result == -ESTALE) {
2448                dout("got ESTALE on request %llu", req->r_tid);
2449                req->r_resend_mds = -1;
2450                if (req->r_direct_mode != USE_AUTH_MDS) {
2451                        dout("not using auth, setting for that now");
2452                        req->r_direct_mode = USE_AUTH_MDS;
2453                        __do_request(mdsc, req);
2454                        mutex_unlock(&mdsc->mutex);
2455                        goto out;
2456                } else  {
2457                        int mds = __choose_mds(mdsc, req);
2458                        if (mds >= 0 && mds != req->r_session->s_mds) {
2459                                dout("but auth changed, so resending");
2460                                __do_request(mdsc, req);
2461                                mutex_unlock(&mdsc->mutex);
2462                                goto out;
2463                        }
2464                }
2465                dout("have to return ESTALE on request %llu", req->r_tid);
2466        }
2467
2468
2469        if (head->safe) {
2470                req->r_got_safe = true;
2471                __unregister_request(mdsc, req);
2472
2473                if (req->r_got_unsafe) {
2474                        /*
2475                         * We already handled the unsafe response, now do the
2476                         * cleanup.  No need to examine the response; the MDS
2477                         * doesn't include any result info in the safe
2478                         * response.  And even if it did, there is nothing
2479                         * useful we could do with a revised return value.
2480                         */
2481                        dout("got safe reply %llu, mds%d\n", tid, mds);
2482
2483                        /* last unsafe request during umount? */
2484                        if (mdsc->stopping && !__get_oldest_req(mdsc))
2485                                complete_all(&mdsc->safe_umount_waiters);
2486                        mutex_unlock(&mdsc->mutex);
2487                        goto out;
2488                }
2489        } else {
2490                req->r_got_unsafe = true;
2491                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2492                if (req->r_unsafe_dir) {
2493                        struct ceph_inode_info *ci =
2494                                        ceph_inode(req->r_unsafe_dir);
2495                        spin_lock(&ci->i_unsafe_lock);
2496                        list_add_tail(&req->r_unsafe_dir_item,
2497                                      &ci->i_unsafe_dirops);
2498                        spin_unlock(&ci->i_unsafe_lock);
2499                }
2500        }
2501
2502        dout("handle_reply tid %lld result %d\n", tid, result);
2503        rinfo = &req->r_reply_info;
2504        err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2505        mutex_unlock(&mdsc->mutex);
2506
2507        mutex_lock(&session->s_mutex);
2508        if (err < 0) {
2509                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2510                ceph_msg_dump(msg);
2511                goto out_err;
2512        }
2513
2514        /* snap trace */
2515        realm = NULL;
2516        if (rinfo->snapblob_len) {
2517                down_write(&mdsc->snap_rwsem);
2518                ceph_update_snap_trace(mdsc, rinfo->snapblob,
2519                                rinfo->snapblob + rinfo->snapblob_len,
2520                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2521                                &realm);
2522                downgrade_write(&mdsc->snap_rwsem);
2523        } else {
2524                down_read(&mdsc->snap_rwsem);
2525        }
2526
2527        /* insert trace into our cache */
2528        mutex_lock(&req->r_fill_mutex);
2529        err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2530        if (err == 0) {
2531                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2532                                    req->r_op == CEPH_MDS_OP_LSSNAP))
2533                        ceph_readdir_prepopulate(req, req->r_session);
2534                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2535        }
2536        mutex_unlock(&req->r_fill_mutex);
2537
2538        up_read(&mdsc->snap_rwsem);
2539        if (realm)
2540                ceph_put_snap_realm(mdsc, realm);
2541
2542        if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
2543                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2544                spin_lock(&ci->i_unsafe_lock);
2545                list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2546                spin_unlock(&ci->i_unsafe_lock);
2547        }
2548out_err:
2549        mutex_lock(&mdsc->mutex);
2550        if (!req->r_aborted) {
2551                if (err) {
2552                        req->r_err = err;
2553                } else {
2554                        req->r_reply =  ceph_msg_get(msg);
2555                        req->r_got_result = true;
2556                }
2557        } else {
2558                dout("reply arrived after request %lld was aborted\n", tid);
2559        }
2560        mutex_unlock(&mdsc->mutex);
2561
2562        mutex_unlock(&session->s_mutex);
2563
2564        /* kick calling process */
2565        complete_request(mdsc, req);
2566out:
2567        ceph_mdsc_put_request(req);
2568        return;
2569}
2570
2571
2572
2573/*
2574 * handle mds notification that our request has been forwarded.
2575 */
2576static void handle_forward(struct ceph_mds_client *mdsc,
2577                           struct ceph_mds_session *session,
2578                           struct ceph_msg *msg)
2579{
2580        struct ceph_mds_request *req;
2581        u64 tid = le64_to_cpu(msg->hdr.tid);
2582        u32 next_mds;
2583        u32 fwd_seq;
2584        int err = -EINVAL;
2585        void *p = msg->front.iov_base;
2586        void *end = p + msg->front.iov_len;
2587
2588        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2589        next_mds = ceph_decode_32(&p);
2590        fwd_seq = ceph_decode_32(&p);
2591
2592        mutex_lock(&mdsc->mutex);
2593        req = __lookup_request(mdsc, tid);
2594        if (!req) {
2595                dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2596                goto out;  /* dup reply? */
2597        }
2598
2599        if (req->r_aborted) {
2600                dout("forward tid %llu aborted, unregistering\n", tid);
2601                __unregister_request(mdsc, req);
2602        } else if (fwd_seq <= req->r_num_fwd) {
2603                dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2604                     tid, next_mds, req->r_num_fwd, fwd_seq);
2605        } else {
2606                /* resend. forward race not possible; mds would drop */
2607                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2608                BUG_ON(req->r_err);
2609                BUG_ON(req->r_got_result);
2610                req->r_attempts = 0;
2611                req->r_num_fwd = fwd_seq;
2612                req->r_resend_mds = next_mds;
2613                put_request_session(req);
2614                __do_request(mdsc, req);
2615        }
2616        ceph_mdsc_put_request(req);
2617out:
2618        mutex_unlock(&mdsc->mutex);
2619        return;
2620
2621bad:
2622        pr_err("mdsc_handle_forward decode error err=%d\n", err);
2623}
2624
2625/*
2626 * handle a mds session control message
2627 */
2628static void handle_session(struct ceph_mds_session *session,
2629                           struct ceph_msg *msg)
2630{
2631        struct ceph_mds_client *mdsc = session->s_mdsc;
2632        u32 op;
2633        u64 seq;
2634        int mds = session->s_mds;
2635        struct ceph_mds_session_head *h = msg->front.iov_base;
2636        int wake = 0;
2637
2638        /* decode */
2639        if (msg->front.iov_len != sizeof(*h))
2640                goto bad;
2641        op = le32_to_cpu(h->op);
2642        seq = le64_to_cpu(h->seq);
2643
2644        mutex_lock(&mdsc->mutex);
2645        if (op == CEPH_SESSION_CLOSE)
2646                __unregister_session(mdsc, session);
2647        /* FIXME: this ttl calculation is generous */
2648        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2649        mutex_unlock(&mdsc->mutex);
2650
2651        mutex_lock(&session->s_mutex);
2652
2653        dout("handle_session mds%d %s %p state %s seq %llu\n",
2654             mds, ceph_session_op_name(op), session,
2655             ceph_session_state_name(session->s_state), seq);
2656
2657        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2658                session->s_state = CEPH_MDS_SESSION_OPEN;
2659                pr_info("mds%d came back\n", session->s_mds);
2660        }
2661
2662        switch (op) {
2663        case CEPH_SESSION_OPEN:
2664                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2665                        pr_info("mds%d reconnect success\n", session->s_mds);
2666                session->s_state = CEPH_MDS_SESSION_OPEN;
2667                renewed_caps(mdsc, session, 0);
2668                wake = 1;
2669                if (mdsc->stopping)
2670                        __close_session(mdsc, session);
2671                break;
2672
2673        case CEPH_SESSION_RENEWCAPS:
2674                if (session->s_renew_seq == seq)
2675                        renewed_caps(mdsc, session, 1);
2676                break;
2677
2678        case CEPH_SESSION_CLOSE:
2679                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2680                        pr_info("mds%d reconnect denied\n", session->s_mds);
2681                cleanup_session_requests(mdsc, session);
2682                remove_session_caps(session);
2683                wake = 2; /* for good measure */
2684                wake_up_all(&mdsc->session_close_wq);
2685                break;
2686
2687        case CEPH_SESSION_STALE:
2688                pr_info("mds%d caps went stale, renewing\n",
2689                        session->s_mds);
2690                spin_lock(&session->s_gen_ttl_lock);
2691                session->s_cap_gen++;
2692                session->s_cap_ttl = jiffies - 1;
2693                spin_unlock(&session->s_gen_ttl_lock);
2694                send_renew_caps(mdsc, session);
2695                break;
2696
2697        case CEPH_SESSION_RECALL_STATE:
2698                trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2699                break;
2700
2701        case CEPH_SESSION_FLUSHMSG:
2702                send_flushmsg_ack(mdsc, session, seq);
2703                break;
2704
2705        case CEPH_SESSION_FORCE_RO:
2706                dout("force_session_readonly %p\n", session);
2707                spin_lock(&session->s_cap_lock);
2708                session->s_readonly = true;
2709                spin_unlock(&session->s_cap_lock);
2710                wake_up_session_caps(session, 0);
2711                break;
2712
2713        default:
2714                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2715                WARN_ON(1);
2716        }
2717
2718        mutex_unlock(&session->s_mutex);
2719        if (wake) {
2720                mutex_lock(&mdsc->mutex);
2721                __wake_requests(mdsc, &session->s_waiting);
2722                if (wake == 2)
2723                        kick_requests(mdsc, mds);
2724                mutex_unlock(&mdsc->mutex);
2725        }
2726        return;
2727
2728bad:
2729        pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2730               (int)msg->front.iov_len);
2731        ceph_msg_dump(msg);
2732        return;
2733}
2734
2735
2736/*
2737 * called under session->mutex.
2738 */
2739static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2740                                   struct ceph_mds_session *session)
2741{
2742        struct ceph_mds_request *req, *nreq;
2743        struct rb_node *p;
2744        int err;
2745
2746        dout("replay_unsafe_requests mds%d\n", session->s_mds);
2747
2748        mutex_lock(&mdsc->mutex);
2749        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2750                err = __prepare_send_request(mdsc, req, session->s_mds, true);
2751                if (!err) {
2752                        ceph_msg_get(req->r_request);
2753                        ceph_con_send(&session->s_con, req->r_request);
2754                }
2755        }
2756
2757        /*
2758         * also re-send old requests when MDS enters reconnect stage. So that MDS
2759         * can process completed request in clientreplay stage.
2760         */
2761        p = rb_first(&mdsc->request_tree);
2762        while (p) {
2763                req = rb_entry(p, struct ceph_mds_request, r_node);
2764                p = rb_next(p);
2765                if (req->r_got_unsafe)
2766                        continue;
2767                if (req->r_attempts == 0)
2768                        continue; /* only old requests */
2769                if (req->r_session &&
2770                    req->r_session->s_mds == session->s_mds) {
2771                        err = __prepare_send_request(mdsc, req,
2772                                                     session->s_mds, true);
2773                        if (!err) {
2774                                ceph_msg_get(req->r_request);
2775                                ceph_con_send(&session->s_con, req->r_request);
2776                        }
2777                }
2778        }
2779        mutex_unlock(&mdsc->mutex);
2780}
2781
2782/*
2783 * Encode information about a cap for a reconnect with the MDS.
2784 */
2785static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2786                          void *arg)
2787{
2788        union {
2789                struct ceph_mds_cap_reconnect v2;
2790                struct ceph_mds_cap_reconnect_v1 v1;
2791        } rec;
2792        size_t reclen;
2793        struct ceph_inode_info *ci;
2794        struct ceph_reconnect_state *recon_state = arg;
2795        struct ceph_pagelist *pagelist = recon_state->pagelist;
2796        char *path;
2797        int pathlen, err;
2798        u64 pathbase;
2799        struct dentry *dentry;
2800
2801        ci = cap->ci;
2802
2803        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2804             inode, ceph_vinop(inode), cap, cap->cap_id,
2805             ceph_cap_string(cap->issued));
2806        err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2807        if (err)
2808                return err;
2809
2810        dentry = d_find_alias(inode);
2811        if (dentry) {
2812                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2813                if (IS_ERR(path)) {
2814                        err = PTR_ERR(path);
2815                        goto out_dput;
2816                }
2817        } else {
2818                path = NULL;
2819                pathlen = 0;
2820        }
2821        err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2822        if (err)
2823                goto out_free;
2824
2825        spin_lock(&ci->i_ceph_lock);
2826        cap->seq = 0;        /* reset cap seq */
2827        cap->issue_seq = 0;  /* and issue_seq */
2828        cap->mseq = 0;       /* and migrate_seq */
2829        cap->cap_gen = cap->session->s_cap_gen;
2830
2831        if (recon_state->flock) {
2832                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2833                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2834                rec.v2.issued = cpu_to_le32(cap->issued);
2835                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2836                rec.v2.pathbase = cpu_to_le64(pathbase);
2837                rec.v2.flock_len = 0;
2838                reclen = sizeof(rec.v2);
2839        } else {
2840                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2841                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2842                rec.v1.issued = cpu_to_le32(cap->issued);
2843                rec.v1.size = cpu_to_le64(inode->i_size);
2844                ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2845                ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2846                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2847                rec.v1.pathbase = cpu_to_le64(pathbase);
2848                reclen = sizeof(rec.v1);
2849        }
2850        spin_unlock(&ci->i_ceph_lock);
2851
2852        if (recon_state->flock) {
2853                int num_fcntl_locks, num_flock_locks;
2854                struct ceph_filelock *flocks;
2855
2856encode_again:
2857                ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2858                flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2859                                 sizeof(struct ceph_filelock), GFP_NOFS);
2860                if (!flocks) {
2861                        err = -ENOMEM;
2862                        goto out_free;
2863                }
2864                err = ceph_encode_locks_to_buffer(inode, flocks,
2865                                                  num_fcntl_locks,
2866                                                  num_flock_locks);
2867                if (err) {
2868                        kfree(flocks);
2869                        if (err == -ENOSPC)
2870                                goto encode_again;
2871                        goto out_free;
2872                }
2873                /*
2874                 * number of encoded locks is stable, so copy to pagelist
2875                 */
2876                rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
2877                                    (num_fcntl_locks+num_flock_locks) *
2878                                    sizeof(struct ceph_filelock));
2879                err = ceph_pagelist_append(pagelist, &rec, reclen);
2880                if (!err)
2881                        err = ceph_locks_to_pagelist(flocks, pagelist,
2882                                                     num_fcntl_locks,
2883                                                     num_flock_locks);
2884                kfree(flocks);
2885        } else {
2886                err = ceph_pagelist_append(pagelist, &rec, reclen);
2887        }
2888
2889        recon_state->nr_caps++;
2890out_free:
2891        kfree(path);
2892out_dput:
2893        dput(dentry);
2894        return err;
2895}
2896
2897
2898/*
2899 * If an MDS fails and recovers, clients need to reconnect in order to
2900 * reestablish shared state.  This includes all caps issued through
2901 * this session _and_ the snap_realm hierarchy.  Because it's not
2902 * clear which snap realms the mds cares about, we send everything we
2903 * know about.. that ensures we'll then get any new info the
2904 * recovering MDS might have.
2905 *
2906 * This is a relatively heavyweight operation, but it's rare.
2907 *
2908 * called with mdsc->mutex held.
2909 */
2910static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2911                               struct ceph_mds_session *session)
2912{
2913        struct ceph_msg *reply;
2914        struct rb_node *p;
2915        int mds = session->s_mds;
2916        int err = -ENOMEM;
2917        int s_nr_caps;
2918        struct ceph_pagelist *pagelist;
2919        struct ceph_reconnect_state recon_state;
2920
2921        pr_info("mds%d reconnect start\n", mds);
2922
2923        pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2924        if (!pagelist)
2925                goto fail_nopagelist;
2926        ceph_pagelist_init(pagelist);
2927
2928        reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2929        if (!reply)
2930                goto fail_nomsg;
2931
2932        mutex_lock(&session->s_mutex);
2933        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2934        session->s_seq = 0;
2935
2936        dout("session %p state %s\n", session,
2937             ceph_session_state_name(session->s_state));
2938
2939        spin_lock(&session->s_gen_ttl_lock);
2940        session->s_cap_gen++;
2941        spin_unlock(&session->s_gen_ttl_lock);
2942
2943        spin_lock(&session->s_cap_lock);
2944        /* don't know if session is readonly */
2945        session->s_readonly = 0;
2946        /*
2947         * notify __ceph_remove_cap() that we are composing cap reconnect.
2948         * If a cap get released before being added to the cap reconnect,
2949         * __ceph_remove_cap() should skip queuing cap release.
2950         */
2951        session->s_cap_reconnect = 1;
2952        /* drop old cap expires; we're about to reestablish that state */
2953        cleanup_cap_releases(mdsc, session);
2954
2955        /* trim unused caps to reduce MDS's cache rejoin time */
2956        if (mdsc->fsc->sb->s_root)
2957                shrink_dcache_parent(mdsc->fsc->sb->s_root);
2958
2959        ceph_con_close(&session->s_con);
2960        ceph_con_open(&session->s_con,
2961                      CEPH_ENTITY_TYPE_MDS, mds,
2962                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2963
2964        /* replay unsafe requests */
2965        replay_unsafe_requests(mdsc, session);
2966
2967        down_read(&mdsc->snap_rwsem);
2968
2969        /* traverse this session's caps */
2970        s_nr_caps = session->s_nr_caps;
2971        err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
2972        if (err)
2973                goto fail;
2974
2975        recon_state.nr_caps = 0;
2976        recon_state.pagelist = pagelist;
2977        recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2978        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2979        if (err < 0)
2980                goto fail;
2981
2982        spin_lock(&session->s_cap_lock);
2983        session->s_cap_reconnect = 0;
2984        spin_unlock(&session->s_cap_lock);
2985
2986        /*
2987         * snaprealms.  we provide mds with the ino, seq (version), and
2988         * parent for all of our realms.  If the mds has any newer info,
2989         * it will tell us.
2990         */
2991        for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2992                struct ceph_snap_realm *realm =
2993                        rb_entry(p, struct ceph_snap_realm, node);
2994                struct ceph_mds_snaprealm_reconnect sr_rec;
2995
2996                dout(" adding snap realm %llx seq %lld parent %llx\n",
2997                     realm->ino, realm->seq, realm->parent_ino);
2998                sr_rec.ino = cpu_to_le64(realm->ino);
2999                sr_rec.seq = cpu_to_le64(realm->seq);
3000                sr_rec.parent = cpu_to_le64(realm->parent_ino);
3001                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3002                if (err)
3003                        goto fail;
3004        }
3005
3006        if (recon_state.flock)
3007                reply->hdr.version = cpu_to_le16(2);
3008
3009        /* raced with cap release? */
3010        if (s_nr_caps != recon_state.nr_caps) {
3011                struct page *page = list_first_entry(&pagelist->head,
3012                                                     struct page, lru);
3013                __le32 *addr = kmap_atomic(page);
3014                *addr = cpu_to_le32(recon_state.nr_caps);
3015                kunmap_atomic(addr);
3016        }
3017
3018        reply->hdr.data_len = cpu_to_le32(pagelist->length);
3019        ceph_msg_data_add_pagelist(reply, pagelist);
3020
3021        ceph_early_kick_flushing_caps(mdsc, session);
3022
3023        ceph_con_send(&session->s_con, reply);
3024
3025        mutex_unlock(&session->s_mutex);
3026
3027        mutex_lock(&mdsc->mutex);
3028        __wake_requests(mdsc, &session->s_waiting);
3029        mutex_unlock(&mdsc->mutex);
3030
3031        up_read(&mdsc->snap_rwsem);
3032        return;
3033
3034fail:
3035        ceph_msg_put(reply);
3036        up_read(&mdsc->snap_rwsem);
3037        mutex_unlock(&session->s_mutex);
3038fail_nomsg:
3039        ceph_pagelist_release(pagelist);
3040fail_nopagelist:
3041        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3042        return;
3043}
3044
3045
3046/*
3047 * compare old and new mdsmaps, kicking requests
3048 * and closing out old connections as necessary
3049 *
3050 * called under mdsc->mutex.
3051 */
3052static void check_new_map(struct ceph_mds_client *mdsc,
3053                          struct ceph_mdsmap *newmap,
3054                          struct ceph_mdsmap *oldmap)
3055{
3056        int i;
3057        int oldstate, newstate;
3058        struct ceph_mds_session *s;
3059
3060        dout("check_new_map new %u old %u\n",
3061             newmap->m_epoch, oldmap->m_epoch);
3062
3063        for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
3064                if (mdsc->sessions[i] == NULL)
3065                        continue;
3066                s = mdsc->sessions[i];
3067                oldstate = ceph_mdsmap_get_state(oldmap, i);
3068                newstate = ceph_mdsmap_get_state(newmap, i);
3069
3070                dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3071                     i, ceph_mds_state_name(oldstate),
3072                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3073                     ceph_mds_state_name(newstate),
3074                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3075                     ceph_session_state_name(s->s_state));
3076
3077                if (i >= newmap->m_max_mds ||
3078                    memcmp(ceph_mdsmap_get_addr(oldmap, i),
3079                           ceph_mdsmap_get_addr(newmap, i),
3080                           sizeof(struct ceph_entity_addr))) {
3081                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3082                                /* the session never opened, just close it
3083                                 * out now */
3084                                __wake_requests(mdsc, &s->s_waiting);
3085                                __unregister_session(mdsc, s);
3086                        } else {
3087                                /* just close it */
3088                                mutex_unlock(&mdsc->mutex);
3089                                mutex_lock(&s->s_mutex);
3090                                mutex_lock(&mdsc->mutex);
3091                                ceph_con_close(&s->s_con);
3092                                mutex_unlock(&s->s_mutex);
3093                                s->s_state = CEPH_MDS_SESSION_RESTARTING;
3094                        }
3095                } else if (oldstate == newstate) {
3096                        continue;  /* nothing new with this mds */
3097                }
3098
3099                /*
3100                 * send reconnect?
3101                 */
3102                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3103                    newstate >= CEPH_MDS_STATE_RECONNECT) {
3104                        mutex_unlock(&mdsc->mutex);
3105                        send_mds_reconnect(mdsc, s);
3106                        mutex_lock(&mdsc->mutex);
3107                }
3108
3109                /*
3110                 * kick request on any mds that has gone active.
3111                 */
3112                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3113                    newstate >= CEPH_MDS_STATE_ACTIVE) {
3114                        if (oldstate != CEPH_MDS_STATE_CREATING &&
3115                            oldstate != CEPH_MDS_STATE_STARTING)
3116                                pr_info("mds%d recovery completed\n", s->s_mds);
3117                        kick_requests(mdsc, i);
3118                        ceph_kick_flushing_caps(mdsc, s);
3119                        wake_up_session_caps(s, 1);
3120                }
3121        }
3122
3123        for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
3124                s = mdsc->sessions[i];
3125                if (!s)
3126                        continue;
3127                if (!ceph_mdsmap_is_laggy(newmap, i))
3128                        continue;
3129                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3130                    s->s_state == CEPH_MDS_SESSION_HUNG ||
3131                    s->s_state == CEPH_MDS_SESSION_CLOSING) {
3132                        dout(" connecting to export targets of laggy mds%d\n",
3133                             i);
3134                        __open_export_target_sessions(mdsc, s);
3135                }
3136        }
3137}
3138
3139
3140
3141/*
3142 * leases
3143 */
3144
3145/*
3146 * caller must hold session s_mutex, dentry->d_lock
3147 */
3148void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3149{
3150        struct ceph_dentry_info *di = ceph_dentry(dentry);
3151
3152        ceph_put_mds_session(di->lease_session);
3153        di->lease_session = NULL;
3154}
3155
3156static void handle_lease(struct ceph_mds_client *mdsc,
3157                         struct ceph_mds_session *session,
3158                         struct ceph_msg *msg)
3159{
3160        struct super_block *sb = mdsc->fsc->sb;
3161        struct inode *inode;
3162        struct dentry *parent, *dentry;
3163        struct ceph_dentry_info *di;
3164        int mds = session->s_mds;
3165        struct ceph_mds_lease *h = msg->front.iov_base;
3166        u32 seq;
3167        struct ceph_vino vino;
3168        struct qstr dname;
3169        int release = 0;
3170
3171        dout("handle_lease from mds%d\n", mds);
3172
3173        /* decode */
3174        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3175                goto bad;
3176        vino.ino = le64_to_cpu(h->ino);
3177        vino.snap = CEPH_NOSNAP;
3178        seq = le32_to_cpu(h->seq);
3179        dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3180        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3181        if (dname.len != get_unaligned_le32(h+1))
3182                goto bad;
3183
3184        /* lookup inode */
3185        inode = ceph_find_inode(sb, vino);
3186        dout("handle_lease %s, ino %llx %p %.*s\n",
3187             ceph_lease_op_name(h->action), vino.ino, inode,
3188             dname.len, dname.name);
3189
3190        mutex_lock(&session->s_mutex);
3191        session->s_seq++;
3192
3193        if (inode == NULL) {
3194                dout("handle_lease no inode %llx\n", vino.ino);
3195                goto release;
3196        }
3197
3198        /* dentry */
3199        parent = d_find_alias(inode);
3200        if (!parent) {
3201                dout("no parent dentry on inode %p\n", inode);
3202                WARN_ON(1);
3203                goto release;  /* hrm... */
3204        }
3205        dname.hash = full_name_hash(dname.name, dname.len);
3206        dentry = d_lookup(parent, &dname);
3207        dput(parent);
3208        if (!dentry)
3209                goto release;
3210
3211        spin_lock(&dentry->d_lock);
3212        di = ceph_dentry(dentry);
3213        switch (h->action) {
3214        case CEPH_MDS_LEASE_REVOKE:
3215                if (di->lease_session == session) {
3216                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3217                                h->seq = cpu_to_le32(di->lease_seq);
3218                        __ceph_mdsc_drop_dentry_lease(dentry);
3219                }
3220                release = 1;
3221                break;
3222
3223        case CEPH_MDS_LEASE_RENEW:
3224                if (di->lease_session == session &&
3225                    di->lease_gen == session->s_cap_gen &&
3226                    di->lease_renew_from &&
3227                    di->lease_renew_after == 0) {
3228                        unsigned long duration =
3229                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3230
3231                        di->lease_seq = seq;
3232                        dentry->d_time = di->lease_renew_from + duration;
3233                        di->lease_renew_after = di->lease_renew_from +
3234                                (duration >> 1);
3235                        di->lease_renew_from = 0;
3236                }
3237                break;
3238        }
3239        spin_unlock(&dentry->d_lock);
3240        dput(dentry);
3241
3242        if (!release)
3243                goto out;
3244
3245release:
3246        /* let's just reuse the same message */
3247        h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3248        ceph_msg_get(msg);
3249        ceph_con_send(&session->s_con, msg);
3250
3251out:
3252        iput(inode);
3253        mutex_unlock(&session->s_mutex);
3254        return;
3255
3256bad:
3257        pr_err("corrupt lease message\n");
3258        ceph_msg_dump(msg);
3259}
3260
3261void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3262                              struct inode *inode,
3263                              struct dentry *dentry, char action,
3264                              u32 seq)
3265{
3266        struct ceph_msg *msg;
3267        struct ceph_mds_lease *lease;
3268        int len = sizeof(*lease) + sizeof(u32);
3269        int dnamelen = 0;
3270
3271        dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3272             inode, dentry, ceph_lease_op_name(action), session->s_mds);
3273        dnamelen = dentry->d_name.len;
3274        len += dnamelen;
3275
3276        msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3277        if (!msg)
3278                return;
3279        lease = msg->front.iov_base;
3280        lease->action = action;
3281        lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3282        lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3283        lease->seq = cpu_to_le32(seq);
3284        put_unaligned_le32(dnamelen, lease + 1);
3285        memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3286
3287        /*
3288         * if this is a preemptive lease RELEASE, no need to
3289         * flush request stream, since the actual request will
3290         * soon follow.
3291         */
3292        msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3293
3294        ceph_con_send(&session->s_con, msg);
3295}
3296
3297/*
3298 * Preemptively release a lease we expect to invalidate anyway.
3299 * Pass @inode always, @dentry is optional.
3300 */
3301void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
3302                             struct dentry *dentry)
3303{
3304        struct ceph_dentry_info *di;
3305        struct ceph_mds_session *session;
3306        u32 seq;
3307
3308        BUG_ON(inode == NULL);
3309        BUG_ON(dentry == NULL);
3310
3311        /* is dentry lease valid? */
3312        spin_lock(&dentry->d_lock);
3313        di = ceph_dentry(dentry);
3314        if (!di || !di->lease_session ||
3315            di->lease_session->s_mds < 0 ||
3316            di->lease_gen != di->lease_session->s_cap_gen ||
3317            !time_before(jiffies, dentry->d_time)) {
3318                dout("lease_release inode %p dentry %p -- "
3319                     "no lease\n",
3320                     inode, dentry);
3321                spin_unlock(&dentry->d_lock);
3322                return;
3323        }
3324
3325        /* we do have a lease on this dentry; note mds and seq */
3326        session = ceph_get_mds_session(di->lease_session);
3327        seq = di->lease_seq;
3328        __ceph_mdsc_drop_dentry_lease(dentry);
3329        spin_unlock(&dentry->d_lock);
3330
3331        dout("lease_release inode %p dentry %p to mds%d\n",
3332             inode, dentry, session->s_mds);
3333        ceph_mdsc_lease_send_msg(session, inode, dentry,
3334                                 CEPH_MDS_LEASE_RELEASE, seq);
3335        ceph_put_mds_session(session);
3336}
3337
3338/*
3339 * drop all leases (and dentry refs) in preparation for umount
3340 */
3341static void drop_leases(struct ceph_mds_client *mdsc)
3342{
3343        int i;
3344
3345        dout("drop_leases\n");
3346        mutex_lock(&mdsc->mutex);
3347        for (i = 0; i < mdsc->max_sessions; i++) {
3348                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3349                if (!s)
3350                        continue;
3351                mutex_unlock(&mdsc->mutex);
3352                mutex_lock(&s->s_mutex);
3353                mutex_unlock(&s->s_mutex);
3354                ceph_put_mds_session(s);
3355                mutex_lock(&mdsc->mutex);
3356        }
3357        mutex_unlock(&mdsc->mutex);
3358}
3359
3360
3361
3362/*
3363 * delayed work -- periodically trim expired leases, renew caps with mds
3364 */
3365static void schedule_delayed(struct ceph_mds_client *mdsc)
3366{
3367        int delay = 5;
3368        unsigned hz = round_jiffies_relative(HZ * delay);
3369        schedule_delayed_work(&mdsc->delayed_work, hz);
3370}
3371
3372static void delayed_work(struct work_struct *work)
3373{
3374        int i;
3375        struct ceph_mds_client *mdsc =
3376                container_of(work, struct ceph_mds_client, delayed_work.work);
3377        int renew_interval;
3378        int renew_caps;
3379
3380        dout("mdsc delayed_work\n");
3381        ceph_check_delayed_caps(mdsc);
3382
3383        mutex_lock(&mdsc->mutex);
3384        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3385        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3386                                   mdsc->last_renew_caps);
3387        if (renew_caps)
3388                mdsc->last_renew_caps = jiffies;
3389
3390        for (i = 0; i < mdsc->max_sessions; i++) {
3391                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3392                if (s == NULL)
3393                        continue;
3394                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3395                        dout("resending session close request for mds%d\n",
3396                             s->s_mds);
3397                        request_close_session(mdsc, s);
3398                        ceph_put_mds_session(s);
3399                        continue;
3400                }
3401                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3402                        if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3403                                s->s_state = CEPH_MDS_SESSION_HUNG;
3404                                pr_info("mds%d hung\n", s->s_mds);
3405                        }
3406                }
3407                if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3408                        /* this mds is failed or recovering, just wait */
3409                        ceph_put_mds_session(s);
3410                        continue;
3411                }
3412                mutex_unlock(&mdsc->mutex);
3413
3414                mutex_lock(&s->s_mutex);
3415                if (renew_caps)
3416                        send_renew_caps(mdsc, s);
3417                else
3418                        ceph_con_keepalive(&s->s_con);
3419                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3420                    s->s_state == CEPH_MDS_SESSION_HUNG)
3421                        ceph_send_cap_releases(mdsc, s);
3422                mutex_unlock(&s->s_mutex);
3423                ceph_put_mds_session(s);
3424
3425                mutex_lock(&mdsc->mutex);
3426        }
3427        mutex_unlock(&mdsc->mutex);
3428
3429        schedule_delayed(mdsc);
3430}
3431
3432int ceph_mdsc_init(struct ceph_fs_client *fsc)
3433
3434{
3435        struct ceph_mds_client *mdsc;
3436
3437        mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3438        if (!mdsc)
3439                return -ENOMEM;
3440        mdsc->fsc = fsc;
3441        fsc->mdsc = mdsc;
3442        mutex_init(&mdsc->mutex);
3443        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3444        if (mdsc->mdsmap == NULL) {
3445                kfree(mdsc);
3446                return -ENOMEM;
3447        }
3448
3449        init_completion(&mdsc->safe_umount_waiters);
3450        init_waitqueue_head(&mdsc->session_close_wq);
3451        INIT_LIST_HEAD(&mdsc->waiting_for_map);
3452        mdsc->sessions = NULL;
3453        atomic_set(&mdsc->num_sessions, 0);
3454        mdsc->max_sessions = 0;
3455        mdsc->stopping = 0;
3456        mdsc->last_snap_seq = 0;
3457        init_rwsem(&mdsc->snap_rwsem);
3458        mdsc->snap_realms = RB_ROOT;
3459        INIT_LIST_HEAD(&mdsc->snap_empty);
3460        spin_lock_init(&mdsc->snap_empty_lock);
3461        mdsc->last_tid = 0;
3462        mdsc->oldest_tid = 0;
3463        mdsc->request_tree = RB_ROOT;
3464        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3465        mdsc->last_renew_caps = jiffies;
3466        INIT_LIST_HEAD(&mdsc->cap_delay_list);
3467        spin_lock_init(&mdsc->cap_delay_lock);
3468        INIT_LIST_HEAD(&mdsc->snap_flush_list);
3469        spin_lock_init(&mdsc->snap_flush_lock);
3470        mdsc->last_cap_flush_tid = 1;
3471        mdsc->cap_flush_tree = RB_ROOT;
3472        INIT_LIST_HEAD(&mdsc->cap_dirty);
3473        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3474        mdsc->num_cap_flushing = 0;
3475        spin_lock_init(&mdsc->cap_dirty_lock);
3476        init_waitqueue_head(&mdsc->cap_flushing_wq);
3477        spin_lock_init(&mdsc->dentry_lru_lock);
3478        INIT_LIST_HEAD(&mdsc->dentry_lru);
3479
3480        ceph_caps_init(mdsc);
3481        ceph_adjust_min_caps(mdsc, fsc->min_caps);
3482
3483        init_rwsem(&mdsc->pool_perm_rwsem);
3484        mdsc->pool_perm_tree = RB_ROOT;
3485
3486        return 0;
3487}
3488
3489/*
3490 * Wait for safe replies on open mds requests.  If we time out, drop
3491 * all requests from the tree to avoid dangling dentry refs.
3492 */
3493static void wait_requests(struct ceph_mds_client *mdsc)
3494{
3495        struct ceph_options *opts = mdsc->fsc->client->options;
3496        struct ceph_mds_request *req;
3497
3498        mutex_lock(&mdsc->mutex);
3499        if (__get_oldest_req(mdsc)) {
3500                mutex_unlock(&mdsc->mutex);
3501
3502                dout("wait_requests waiting for requests\n");
3503                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3504                                    ceph_timeout_jiffies(opts->mount_timeout));
3505
3506                /* tear down remaining requests */
3507                mutex_lock(&mdsc->mutex);
3508                while ((req = __get_oldest_req(mdsc))) {
3509                        dout("wait_requests timed out on tid %llu\n",
3510                             req->r_tid);
3511                        __unregister_request(mdsc, req);
3512                }
3513        }
3514        mutex_unlock(&mdsc->mutex);
3515        dout("wait_requests done\n");
3516}
3517
3518/*
3519 * called before mount is ro, and before dentries are torn down.
3520 * (hmm, does this still race with new lookups?)
3521 */
3522void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3523{
3524        dout("pre_umount\n");
3525        mdsc->stopping = 1;
3526
3527        drop_leases(mdsc);
3528        ceph_flush_dirty_caps(mdsc);
3529        wait_requests(mdsc);
3530
3531        /*
3532         * wait for reply handlers to drop their request refs and
3533         * their inode/dcache refs
3534         */
3535        ceph_msgr_flush();
3536}
3537
3538/*
3539 * wait for all write mds requests to flush.
3540 */
3541static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3542{
3543        struct ceph_mds_request *req = NULL, *nextreq;
3544        struct rb_node *n;
3545
3546        mutex_lock(&mdsc->mutex);
3547        dout("wait_unsafe_requests want %lld\n", want_tid);
3548restart:
3549        req = __get_oldest_req(mdsc);
3550        while (req && req->r_tid <= want_tid) {
3551                /* find next request */
3552                n = rb_next(&req->r_node);
3553                if (n)
3554                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3555                else
3556                        nextreq = NULL;
3557                if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3558                    (req->r_op & CEPH_MDS_OP_WRITE)) {
3559                        /* write op */
3560                        ceph_mdsc_get_request(req);
3561                        if (nextreq)
3562                                ceph_mdsc_get_request(nextreq);
3563                        mutex_unlock(&mdsc->mutex);
3564                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3565                             req->r_tid, want_tid);
3566                        wait_for_completion(&req->r_safe_completion);
3567                        mutex_lock(&mdsc->mutex);
3568                        ceph_mdsc_put_request(req);
3569                        if (!nextreq)
3570                                break;  /* next dne before, so we're done! */
3571                        if (RB_EMPTY_NODE(&nextreq->r_node)) {
3572                                /* next request was removed from tree */
3573                                ceph_mdsc_put_request(nextreq);
3574                                goto restart;
3575                        }
3576                        ceph_mdsc_put_request(nextreq);  /* won't go away */
3577                }
3578                req = nextreq;
3579        }
3580        mutex_unlock(&mdsc->mutex);
3581        dout("wait_unsafe_requests done\n");
3582}
3583
3584void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3585{
3586        u64 want_tid, want_flush, want_snap;
3587
3588        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3589                return;
3590
3591        dout("sync\n");
3592        mutex_lock(&mdsc->mutex);
3593        want_tid = mdsc->last_tid;
3594        mutex_unlock(&mdsc->mutex);
3595
3596        ceph_flush_dirty_caps(mdsc);
3597        spin_lock(&mdsc->cap_dirty_lock);
3598        want_flush = mdsc->last_cap_flush_tid;
3599        spin_unlock(&mdsc->cap_dirty_lock);
3600
3601        down_read(&mdsc->snap_rwsem);
3602        want_snap = mdsc->last_snap_seq;
3603        up_read(&mdsc->snap_rwsem);
3604
3605        dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
3606             want_tid, want_flush, want_snap);
3607
3608        wait_unsafe_requests(mdsc, want_tid);
3609        wait_caps_flush(mdsc, want_flush, want_snap);
3610}
3611
3612/*
3613 * true if all sessions are closed, or we force unmount
3614 */
3615static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3616{
3617        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3618                return true;
3619        return atomic_read(&mdsc->num_sessions) == 0;
3620}
3621
3622/*
3623 * called after sb is ro.
3624 */
3625void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3626{
3627        struct ceph_options *opts = mdsc->fsc->client->options;
3628        struct ceph_mds_session *session;
3629        int i;
3630
3631        dout("close_sessions\n");
3632
3633        /* close sessions */
3634        mutex_lock(&mdsc->mutex);
3635        for (i = 0; i < mdsc->max_sessions; i++) {
3636                session = __ceph_lookup_mds_session(mdsc, i);
3637                if (!session)
3638                        continue;
3639                mutex_unlock(&mdsc->mutex);
3640                mutex_lock(&session->s_mutex);
3641                __close_session(mdsc, session);
3642                mutex_unlock(&session->s_mutex);
3643                ceph_put_mds_session(session);
3644                mutex_lock(&mdsc->mutex);
3645        }
3646        mutex_unlock(&mdsc->mutex);
3647
3648        dout("waiting for sessions to close\n");
3649        wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3650                           ceph_timeout_jiffies(opts->mount_timeout));
3651
3652        /* tear down remaining sessions */
3653        mutex_lock(&mdsc->mutex);
3654        for (i = 0; i < mdsc->max_sessions; i++) {
3655                if (mdsc->sessions[i]) {
3656                        session = get_session(mdsc->sessions[i]);
3657                        __unregister_session(mdsc, session);
3658                        mutex_unlock(&mdsc->mutex);
3659                        mutex_lock(&session->s_mutex);
3660                        remove_session_caps(session);
3661                        mutex_unlock(&session->s_mutex);
3662                        ceph_put_mds_session(session);
3663                        mutex_lock(&mdsc->mutex);
3664                }
3665        }
3666        WARN_ON(!list_empty(&mdsc->cap_delay_list));
3667        mutex_unlock(&mdsc->mutex);
3668
3669        ceph_cleanup_empty_realms(mdsc);
3670
3671        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3672
3673        dout("stopped\n");
3674}
3675
3676void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3677{
3678        struct ceph_mds_session *session;
3679        int mds;
3680
3681        dout("force umount\n");
3682
3683        mutex_lock(&mdsc->mutex);
3684        for (mds = 0; mds < mdsc->max_sessions; mds++) {
3685                session = __ceph_lookup_mds_session(mdsc, mds);
3686                if (!session)
3687                        continue;
3688                mutex_unlock(&mdsc->mutex);
3689                mutex_lock(&session->s_mutex);
3690                __close_session(mdsc, session);
3691                if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3692                        cleanup_session_requests(mdsc, session);
3693                        remove_session_caps(session);
3694                }
3695                mutex_unlock(&session->s_mutex);
3696                ceph_put_mds_session(session);
3697                mutex_lock(&mdsc->mutex);
3698                kick_requests(mdsc, mds);
3699        }
3700        __wake_requests(mdsc, &mdsc->waiting_for_map);
3701        mutex_unlock(&mdsc->mutex);
3702}
3703
3704static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3705{
3706        dout("stop\n");
3707        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3708        if (mdsc->mdsmap)
3709                ceph_mdsmap_destroy(mdsc->mdsmap);
3710        kfree(mdsc->sessions);
3711        ceph_caps_finalize(mdsc);
3712        ceph_pool_perm_destroy(mdsc);
3713}
3714
3715void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3716{
3717        struct ceph_mds_client *mdsc = fsc->mdsc;
3718
3719        dout("mdsc_destroy %p\n", mdsc);
3720        ceph_mdsc_stop(mdsc);
3721
3722        /* flush out any connection work with references to us */
3723        ceph_msgr_flush();
3724
3725        fsc->mdsc = NULL;
3726        kfree(mdsc);
3727        dout("mdsc_destroy %p done\n", mdsc);
3728}
3729
3730
3731/*
3732 * handle mds map update.
3733 */
3734void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3735{
3736        u32 epoch;
3737        u32 maplen;
3738        void *p = msg->front.iov_base;
3739        void *end = p + msg->front.iov_len;
3740        struct ceph_mdsmap *newmap, *oldmap;
3741        struct ceph_fsid fsid;
3742        int err = -EINVAL;
3743
3744        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3745        ceph_decode_copy(&p, &fsid, sizeof(fsid));
3746        if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3747                return;
3748        epoch = ceph_decode_32(&p);
3749        maplen = ceph_decode_32(&p);
3750        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3751
3752        /* do we need it? */
3753        ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3754        mutex_lock(&mdsc->mutex);
3755        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3756                dout("handle_map epoch %u <= our %u\n",
3757                     epoch, mdsc->mdsmap->m_epoch);
3758                mutex_unlock(&mdsc->mutex);
3759                return;
3760        }
3761
3762        newmap = ceph_mdsmap_decode(&p, end);
3763        if (IS_ERR(newmap)) {
3764                err = PTR_ERR(newmap);
3765                goto bad_unlock;
3766        }
3767
3768        /* swap into place */
3769        if (mdsc->mdsmap) {
3770                oldmap = mdsc->mdsmap;
3771                mdsc->mdsmap = newmap;
3772                check_new_map(mdsc, newmap, oldmap);
3773                ceph_mdsmap_destroy(oldmap);
3774        } else {
3775                mdsc->mdsmap = newmap;  /* first mds map */
3776        }
3777        mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3778
3779        __wake_requests(mdsc, &mdsc->waiting_for_map);
3780
3781        mutex_unlock(&mdsc->mutex);
3782        schedule_delayed(mdsc);
3783        return;
3784
3785bad_unlock:
3786        mutex_unlock(&mdsc->mutex);
3787bad:
3788        pr_err("error decoding mdsmap %d\n", err);
3789        return;
3790}
3791
3792static struct ceph_connection *con_get(struct ceph_connection *con)
3793{
3794        struct ceph_mds_session *s = con->private;
3795
3796        if (get_session(s)) {
3797                dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3798                return con;
3799        }
3800        dout("mdsc con_get %p FAIL\n", s);
3801        return NULL;
3802}
3803
3804static void con_put(struct ceph_connection *con)
3805{
3806        struct ceph_mds_session *s = con->private;
3807
3808        dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3809        ceph_put_mds_session(s);
3810}
3811
3812/*
3813 * if the client is unresponsive for long enough, the mds will kill
3814 * the session entirely.
3815 */
3816static void peer_reset(struct ceph_connection *con)
3817{
3818        struct ceph_mds_session *s = con->private;
3819        struct ceph_mds_client *mdsc = s->s_mdsc;
3820
3821        pr_warn("mds%d closed our session\n", s->s_mds);
3822        send_mds_reconnect(mdsc, s);
3823}
3824
3825static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3826{
3827        struct ceph_mds_session *s = con->private;
3828        struct ceph_mds_client *mdsc = s->s_mdsc;
3829        int type = le16_to_cpu(msg->hdr.type);
3830
3831        mutex_lock(&mdsc->mutex);
3832        if (__verify_registered_session(mdsc, s) < 0) {
3833                mutex_unlock(&mdsc->mutex);
3834                goto out;
3835        }
3836        mutex_unlock(&mdsc->mutex);
3837
3838        switch (type) {
3839        case CEPH_MSG_MDS_MAP:
3840                ceph_mdsc_handle_map(mdsc, msg);
3841                break;
3842        case CEPH_MSG_CLIENT_SESSION:
3843                handle_session(s, msg);
3844                break;
3845        case CEPH_MSG_CLIENT_REPLY:
3846                handle_reply(s, msg);
3847                break;
3848        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3849                handle_forward(mdsc, s, msg);
3850                break;
3851        case CEPH_MSG_CLIENT_CAPS:
3852                ceph_handle_caps(s, msg);
3853                break;
3854        case CEPH_MSG_CLIENT_SNAP:
3855                ceph_handle_snap(mdsc, s, msg);
3856                break;
3857        case CEPH_MSG_CLIENT_LEASE:
3858                handle_lease(mdsc, s, msg);
3859                break;
3860
3861        default:
3862                pr_err("received unknown message type %d %s\n", type,
3863                       ceph_msg_type_name(type));
3864        }
3865out:
3866        ceph_msg_put(msg);
3867}
3868
3869/*
3870 * authentication
3871 */
3872
3873/*
3874 * Note: returned pointer is the address of a structure that's
3875 * managed separately.  Caller must *not* attempt to free it.
3876 */
3877static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3878                                        int *proto, int force_new)
3879{
3880        struct ceph_mds_session *s = con->private;
3881        struct ceph_mds_client *mdsc = s->s_mdsc;
3882        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3883        struct ceph_auth_handshake *auth = &s->s_auth;
3884
3885        if (force_new && auth->authorizer) {
3886                ceph_auth_destroy_authorizer(ac, auth->authorizer);
3887                auth->authorizer = NULL;
3888        }
3889        if (!auth->authorizer) {
3890                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3891                                                      auth);
3892                if (ret)
3893                        return ERR_PTR(ret);
3894        } else {
3895                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3896                                                      auth);
3897                if (ret)
3898                        return ERR_PTR(ret);
3899        }
3900        *proto = ac->protocol;
3901
3902        return auth;
3903}
3904
3905
3906static int verify_authorizer_reply(struct ceph_connection *con, int len)
3907{
3908        struct ceph_mds_session *s = con->private;
3909        struct ceph_mds_client *mdsc = s->s_mdsc;
3910        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3911
3912        return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3913}
3914
3915static int invalidate_authorizer(struct ceph_connection *con)
3916{
3917        struct ceph_mds_session *s = con->private;
3918        struct ceph_mds_client *mdsc = s->s_mdsc;
3919        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3920
3921        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3922
3923        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3924}
3925
3926static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3927                                struct ceph_msg_header *hdr, int *skip)
3928{
3929        struct ceph_msg *msg;
3930        int type = (int) le16_to_cpu(hdr->type);
3931        int front_len = (int) le32_to_cpu(hdr->front_len);
3932
3933        if (con->in_msg)
3934                return con->in_msg;
3935
3936        *skip = 0;
3937        msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
3938        if (!msg) {
3939                pr_err("unable to allocate msg type %d len %d\n",
3940                       type, front_len);
3941                return NULL;
3942        }
3943
3944        return msg;
3945}
3946
3947static int mds_sign_message(struct ceph_msg *msg)
3948{
3949       struct ceph_mds_session *s = msg->con->private;
3950       struct ceph_auth_handshake *auth = &s->s_auth;
3951
3952       return ceph_auth_sign_message(auth, msg);
3953}
3954
3955static int mds_check_message_signature(struct ceph_msg *msg)
3956{
3957       struct ceph_mds_session *s = msg->con->private;
3958       struct ceph_auth_handshake *auth = &s->s_auth;
3959
3960       return ceph_auth_check_message_signature(auth, msg);
3961}
3962
3963static const struct ceph_connection_operations mds_con_ops = {
3964        .get = con_get,
3965        .put = con_put,
3966        .dispatch = dispatch,
3967        .get_authorizer = get_authorizer,
3968        .verify_authorizer_reply = verify_authorizer_reply,
3969        .invalidate_authorizer = invalidate_authorizer,
3970        .peer_reset = peer_reset,
3971        .alloc_msg = mds_alloc_msg,
3972        .sign_message = mds_sign_message,
3973        .check_message_signature = mds_check_message_signature,
3974};
3975
3976/* eof */
Note: See TracBrowser for help on using the repository browser.