source: src/linux/universal/linux-4.9/fs/ceph/mds_client.c @ 31662

Last change on this file since 31662 was 31662, checked in by brainslayer, 5 months ago

use new squashfs in all kernels

File size: 102.3 KB
Line 
1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/fs.h>
4#include <linux/wait.h>
5#include <linux/slab.h>
6#include <linux/gfp.h>
7#include <linux/sched.h>
8#include <linux/debugfs.h>
9#include <linux/seq_file.h>
10#include <linux/utsname.h>
11#include <linux/ratelimit.h>
12
13#include "super.h"
14#include "mds_client.h"
15
16#include <linux/ceph/ceph_features.h>
17#include <linux/ceph/messenger.h>
18#include <linux/ceph/decode.h>
19#include <linux/ceph/pagelist.h>
20#include <linux/ceph/auth.h>
21#include <linux/ceph/debugfs.h>
22
23/*
24 * A cluster of MDS (metadata server) daemons is responsible for
25 * managing the file system namespace (the directory hierarchy and
26 * inodes) and for coordinating shared access to storage.  Metadata is
27 * partitioning hierarchically across a number of servers, and that
28 * partition varies over time as the cluster adjusts the distribution
29 * in order to balance load.
30 *
31 * The MDS client is primarily responsible to managing synchronous
32 * metadata requests for operations like open, unlink, and so forth.
33 * If there is a MDS failure, we find out about it when we (possibly
34 * request and) receive a new MDS map, and can resubmit affected
35 * requests.
36 *
37 * For the most part, though, we take advantage of a lossless
38 * communications channel to the MDS, and do not need to worry about
39 * timing out or resubmitting requests.
40 *
41 * We maintain a stateful "session" with each MDS we interact with.
42 * Within each session, we sent periodic heartbeat messages to ensure
43 * any capabilities or leases we have been issues remain valid.  If
44 * the session times out and goes stale, our leases and capabilities
45 * are no longer valid.
46 */
47
48struct ceph_reconnect_state {
49        int nr_caps;
50        struct ceph_pagelist *pagelist;
51        unsigned msg_version;
52};
53
54static void __wake_requests(struct ceph_mds_client *mdsc,
55                            struct list_head *head);
56
57static const struct ceph_connection_operations mds_con_ops;
58
59
60/*
61 * mds reply parsing
62 */
63
64/*
65 * parse individual inode info
66 */
67static int parse_reply_info_in(void **p, void *end,
68                               struct ceph_mds_reply_info_in *info,
69                               u64 features)
70{
71        int err = -EIO;
72
73        info->in = *p;
74        *p += sizeof(struct ceph_mds_reply_inode) +
75                sizeof(*info->in->fragtree.splits) *
76                le32_to_cpu(info->in->fragtree.nsplits);
77
78        ceph_decode_32_safe(p, end, info->symlink_len, bad);
79        ceph_decode_need(p, end, info->symlink_len, bad);
80        info->symlink = *p;
81        *p += info->symlink_len;
82
83        if (features & CEPH_FEATURE_DIRLAYOUTHASH)
84                ceph_decode_copy_safe(p, end, &info->dir_layout,
85                                      sizeof(info->dir_layout), bad);
86        else
87                memset(&info->dir_layout, 0, sizeof(info->dir_layout));
88
89        ceph_decode_32_safe(p, end, info->xattr_len, bad);
90        ceph_decode_need(p, end, info->xattr_len, bad);
91        info->xattr_data = *p;
92        *p += info->xattr_len;
93
94        if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
95                ceph_decode_64_safe(p, end, info->inline_version, bad);
96                ceph_decode_32_safe(p, end, info->inline_len, bad);
97                ceph_decode_need(p, end, info->inline_len, bad);
98                info->inline_data = *p;
99                *p += info->inline_len;
100        } else
101                info->inline_version = CEPH_INLINE_NONE;
102
103        info->pool_ns_len = 0;
104        info->pool_ns_data = NULL;
105        if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
106                ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
107                if (info->pool_ns_len > 0) {
108                        ceph_decode_need(p, end, info->pool_ns_len, bad);
109                        info->pool_ns_data = *p;
110                        *p += info->pool_ns_len;
111                }
112        }
113
114        return 0;
115bad:
116        return err;
117}
118
119/*
120 * parse a normal reply, which may contain a (dir+)dentry and/or a
121 * target inode.
122 */
123static int parse_reply_info_trace(void **p, void *end,
124                                  struct ceph_mds_reply_info_parsed *info,
125                                  u64 features)
126{
127        int err;
128
129        if (info->head->is_dentry) {
130                err = parse_reply_info_in(p, end, &info->diri, features);
131                if (err < 0)
132                        goto out_bad;
133
134                if (unlikely(*p + sizeof(*info->dirfrag) > end))
135                        goto bad;
136                info->dirfrag = *p;
137                *p += sizeof(*info->dirfrag) +
138                        sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
139                if (unlikely(*p > end))
140                        goto bad;
141
142                ceph_decode_32_safe(p, end, info->dname_len, bad);
143                ceph_decode_need(p, end, info->dname_len, bad);
144                info->dname = *p;
145                *p += info->dname_len;
146                info->dlease = *p;
147                *p += sizeof(*info->dlease);
148        }
149
150        if (info->head->is_target) {
151                err = parse_reply_info_in(p, end, &info->targeti, features);
152                if (err < 0)
153                        goto out_bad;
154        }
155
156        if (unlikely(*p != end))
157                goto bad;
158        return 0;
159
160bad:
161        err = -EIO;
162out_bad:
163        pr_err("problem parsing mds trace %d\n", err);
164        return err;
165}
166
167/*
168 * parse readdir results
169 */
170static int parse_reply_info_dir(void **p, void *end,
171                                struct ceph_mds_reply_info_parsed *info,
172                                u64 features)
173{
174        u32 num, i = 0;
175        int err;
176
177        info->dir_dir = *p;
178        if (*p + sizeof(*info->dir_dir) > end)
179                goto bad;
180        *p += sizeof(*info->dir_dir) +
181                sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
182        if (*p > end)
183                goto bad;
184
185        ceph_decode_need(p, end, sizeof(num) + 2, bad);
186        num = ceph_decode_32(p);
187        {
188                u16 flags = ceph_decode_16(p);
189                info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
190                info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
191                info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
192        }
193        if (num == 0)
194                goto done;
195
196        BUG_ON(!info->dir_entries);
197        if ((unsigned long)(info->dir_entries + num) >
198            (unsigned long)info->dir_entries + info->dir_buf_size) {
199                pr_err("dir contents are larger than expected\n");
200                WARN_ON(1);
201                goto bad;
202        }
203
204        info->dir_nr = num;
205        while (num) {
206                struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
207                /* dentry */
208                ceph_decode_need(p, end, sizeof(u32)*2, bad);
209                rde->name_len = ceph_decode_32(p);
210                ceph_decode_need(p, end, rde->name_len, bad);
211                rde->name = *p;
212                *p += rde->name_len;
213                dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
214                rde->lease = *p;
215                *p += sizeof(struct ceph_mds_reply_lease);
216
217                /* inode */
218                err = parse_reply_info_in(p, end, &rde->inode, features);
219                if (err < 0)
220                        goto out_bad;
221                /* ceph_readdir_prepopulate() will update it */
222                rde->offset = 0;
223                i++;
224                num--;
225        }
226
227done:
228        if (*p != end)
229                goto bad;
230        return 0;
231
232bad:
233        err = -EIO;
234out_bad:
235        pr_err("problem parsing dir contents %d\n", err);
236        return err;
237}
238
239/*
240 * parse fcntl F_GETLK results
241 */
242static int parse_reply_info_filelock(void **p, void *end,
243                                     struct ceph_mds_reply_info_parsed *info,
244                                     u64 features)
245{
246        if (*p + sizeof(*info->filelock_reply) > end)
247                goto bad;
248
249        info->filelock_reply = *p;
250        *p += sizeof(*info->filelock_reply);
251
252        if (unlikely(*p != end))
253                goto bad;
254        return 0;
255
256bad:
257        return -EIO;
258}
259
260/*
261 * parse create results
262 */
263static int parse_reply_info_create(void **p, void *end,
264                                  struct ceph_mds_reply_info_parsed *info,
265                                  u64 features)
266{
267        if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
268                if (*p == end) {
269                        info->has_create_ino = false;
270                } else {
271                        info->has_create_ino = true;
272                        info->ino = ceph_decode_64(p);
273                }
274        }
275
276        if (unlikely(*p != end))
277                goto bad;
278        return 0;
279
280bad:
281        return -EIO;
282}
283
284/*
285 * parse extra results
286 */
287static int parse_reply_info_extra(void **p, void *end,
288                                  struct ceph_mds_reply_info_parsed *info,
289                                  u64 features)
290{
291        u32 op = le32_to_cpu(info->head->op);
292
293        if (op == CEPH_MDS_OP_GETFILELOCK)
294                return parse_reply_info_filelock(p, end, info, features);
295        else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
296                return parse_reply_info_dir(p, end, info, features);
297        else if (op == CEPH_MDS_OP_CREATE)
298                return parse_reply_info_create(p, end, info, features);
299        else
300                return -EIO;
301}
302
303/*
304 * parse entire mds reply
305 */
306static int parse_reply_info(struct ceph_msg *msg,
307                            struct ceph_mds_reply_info_parsed *info,
308                            u64 features)
309{
310        void *p, *end;
311        u32 len;
312        int err;
313
314        info->head = msg->front.iov_base;
315        p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
316        end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
317
318        /* trace */
319        ceph_decode_32_safe(&p, end, len, bad);
320        if (len > 0) {
321                ceph_decode_need(&p, end, len, bad);
322                err = parse_reply_info_trace(&p, p+len, info, features);
323                if (err < 0)
324                        goto out_bad;
325        }
326
327        /* extra */
328        ceph_decode_32_safe(&p, end, len, bad);
329        if (len > 0) {
330                ceph_decode_need(&p, end, len, bad);
331                err = parse_reply_info_extra(&p, p+len, info, features);
332                if (err < 0)
333                        goto out_bad;
334        }
335
336        /* snap blob */
337        ceph_decode_32_safe(&p, end, len, bad);
338        info->snapblob_len = len;
339        info->snapblob = p;
340        p += len;
341
342        if (p != end)
343                goto bad;
344        return 0;
345
346bad:
347        err = -EIO;
348out_bad:
349        pr_err("mds parse_reply err %d\n", err);
350        return err;
351}
352
353static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
354{
355        if (!info->dir_entries)
356                return;
357        free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
358}
359
360
361/*
362 * sessions
363 */
364const char *ceph_session_state_name(int s)
365{
366        switch (s) {
367        case CEPH_MDS_SESSION_NEW: return "new";
368        case CEPH_MDS_SESSION_OPENING: return "opening";
369        case CEPH_MDS_SESSION_OPEN: return "open";
370        case CEPH_MDS_SESSION_HUNG: return "hung";
371        case CEPH_MDS_SESSION_CLOSING: return "closing";
372        case CEPH_MDS_SESSION_RESTARTING: return "restarting";
373        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
374        case CEPH_MDS_SESSION_REJECTED: return "rejected";
375        default: return "???";
376        }
377}
378
379static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
380{
381        if (atomic_inc_not_zero(&s->s_ref)) {
382                dout("mdsc get_session %p %d -> %d\n", s,
383                     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
384                return s;
385        } else {
386                dout("mdsc get_session %p 0 -- FAIL", s);
387                return NULL;
388        }
389}
390
391void ceph_put_mds_session(struct ceph_mds_session *s)
392{
393        dout("mdsc put_session %p %d -> %d\n", s,
394             atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
395        if (atomic_dec_and_test(&s->s_ref)) {
396                if (s->s_auth.authorizer)
397                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
398                kfree(s);
399        }
400}
401
402/*
403 * called under mdsc->mutex
404 */
405struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
406                                                   int mds)
407{
408        struct ceph_mds_session *session;
409
410        if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
411                return NULL;
412        session = mdsc->sessions[mds];
413        dout("lookup_mds_session %p %d\n", session,
414             atomic_read(&session->s_ref));
415        get_session(session);
416        return session;
417}
418
419static bool __have_session(struct ceph_mds_client *mdsc, int mds)
420{
421        if (mds >= mdsc->max_sessions)
422                return false;
423        return mdsc->sessions[mds];
424}
425
426static int __verify_registered_session(struct ceph_mds_client *mdsc,
427                                       struct ceph_mds_session *s)
428{
429        if (s->s_mds >= mdsc->max_sessions ||
430            mdsc->sessions[s->s_mds] != s)
431                return -ENOENT;
432        return 0;
433}
434
435/*
436 * create+register a new session for given mds.
437 * called under mdsc->mutex.
438 */
439static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
440                                                 int mds)
441{
442        struct ceph_mds_session *s;
443
444        if (mds >= mdsc->mdsmap->m_max_mds)
445                return ERR_PTR(-EINVAL);
446
447        s = kzalloc(sizeof(*s), GFP_NOFS);
448        if (!s)
449                return ERR_PTR(-ENOMEM);
450        s->s_mdsc = mdsc;
451        s->s_mds = mds;
452        s->s_state = CEPH_MDS_SESSION_NEW;
453        s->s_ttl = 0;
454        s->s_seq = 0;
455        mutex_init(&s->s_mutex);
456
457        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
458
459        spin_lock_init(&s->s_gen_ttl_lock);
460        s->s_cap_gen = 0;
461        s->s_cap_ttl = jiffies - 1;
462
463        spin_lock_init(&s->s_cap_lock);
464        s->s_renew_requested = 0;
465        s->s_renew_seq = 0;
466        INIT_LIST_HEAD(&s->s_caps);
467        s->s_nr_caps = 0;
468        s->s_trim_caps = 0;
469        atomic_set(&s->s_ref, 1);
470        INIT_LIST_HEAD(&s->s_waiting);
471        INIT_LIST_HEAD(&s->s_unsafe);
472        s->s_num_cap_releases = 0;
473        s->s_cap_reconnect = 0;
474        s->s_cap_iterator = NULL;
475        INIT_LIST_HEAD(&s->s_cap_releases);
476        INIT_LIST_HEAD(&s->s_cap_flushing);
477
478        dout("register_session mds%d\n", mds);
479        if (mds >= mdsc->max_sessions) {
480                int newmax = 1 << get_count_order(mds+1);
481                struct ceph_mds_session **sa;
482
483                dout("register_session realloc to %d\n", newmax);
484                sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
485                if (sa == NULL)
486                        goto fail_realloc;
487                if (mdsc->sessions) {
488                        memcpy(sa, mdsc->sessions,
489                               mdsc->max_sessions * sizeof(void *));
490                        kfree(mdsc->sessions);
491                }
492                mdsc->sessions = sa;
493                mdsc->max_sessions = newmax;
494        }
495        mdsc->sessions[mds] = s;
496        atomic_inc(&mdsc->num_sessions);
497        atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
498
499        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
500                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
501
502        return s;
503
504fail_realloc:
505        kfree(s);
506        return ERR_PTR(-ENOMEM);
507}
508
509/*
510 * called under mdsc->mutex
511 */
512static void __unregister_session(struct ceph_mds_client *mdsc,
513                               struct ceph_mds_session *s)
514{
515        dout("__unregister_session mds%d %p\n", s->s_mds, s);
516        BUG_ON(mdsc->sessions[s->s_mds] != s);
517        mdsc->sessions[s->s_mds] = NULL;
518        ceph_con_close(&s->s_con);
519        ceph_put_mds_session(s);
520        atomic_dec(&mdsc->num_sessions);
521}
522
523/*
524 * drop session refs in request.
525 *
526 * should be last request ref, or hold mdsc->mutex
527 */
528static void put_request_session(struct ceph_mds_request *req)
529{
530        if (req->r_session) {
531                ceph_put_mds_session(req->r_session);
532                req->r_session = NULL;
533        }
534}
535
536void ceph_mdsc_release_request(struct kref *kref)
537{
538        struct ceph_mds_request *req = container_of(kref,
539                                                    struct ceph_mds_request,
540                                                    r_kref);
541        destroy_reply_info(&req->r_reply_info);
542        if (req->r_request)
543                ceph_msg_put(req->r_request);
544        if (req->r_reply)
545                ceph_msg_put(req->r_reply);
546        if (req->r_inode) {
547                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
548                iput(req->r_inode);
549        }
550        if (req->r_locked_dir)
551                ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
552        iput(req->r_target_inode);
553        if (req->r_dentry)
554                dput(req->r_dentry);
555        if (req->r_old_dentry)
556                dput(req->r_old_dentry);
557        if (req->r_old_dentry_dir) {
558                /*
559                 * track (and drop pins for) r_old_dentry_dir
560                 * separately, since r_old_dentry's d_parent may have
561                 * changed between the dir mutex being dropped and
562                 * this request being freed.
563                 */
564                ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
565                                  CEPH_CAP_PIN);
566                iput(req->r_old_dentry_dir);
567        }
568        kfree(req->r_path1);
569        kfree(req->r_path2);
570        if (req->r_pagelist)
571                ceph_pagelist_release(req->r_pagelist);
572        put_request_session(req);
573        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
574        kfree(req);
575}
576
577DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
578
579/*
580 * lookup session, bump ref if found.
581 *
582 * called under mdsc->mutex.
583 */
584static struct ceph_mds_request *
585lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
586{
587        struct ceph_mds_request *req;
588
589        req = lookup_request(&mdsc->request_tree, tid);
590        if (req)
591                ceph_mdsc_get_request(req);
592
593        return req;
594}
595
596/*
597 * Register an in-flight request, and assign a tid.  Link to directory
598 * are modifying (if any).
599 *
600 * Called under mdsc->mutex.
601 */
602static void __register_request(struct ceph_mds_client *mdsc,
603                               struct ceph_mds_request *req,
604                               struct inode *dir)
605{
606        req->r_tid = ++mdsc->last_tid;
607        if (req->r_num_caps)
608                ceph_reserve_caps(mdsc, &req->r_caps_reservation,
609                                  req->r_num_caps);
610        dout("__register_request %p tid %lld\n", req, req->r_tid);
611        ceph_mdsc_get_request(req);
612        insert_request(&mdsc->request_tree, req);
613
614        req->r_uid = current_fsuid();
615        req->r_gid = current_fsgid();
616
617        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
618                mdsc->oldest_tid = req->r_tid;
619
620        if (dir) {
621                ihold(dir);
622                req->r_unsafe_dir = dir;
623        }
624}
625
626static void __unregister_request(struct ceph_mds_client *mdsc,
627                                 struct ceph_mds_request *req)
628{
629        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
630
631        /* Never leave an unregistered request on an unsafe list! */
632        list_del_init(&req->r_unsafe_item);
633
634        if (req->r_tid == mdsc->oldest_tid) {
635                struct rb_node *p = rb_next(&req->r_node);
636                mdsc->oldest_tid = 0;
637                while (p) {
638                        struct ceph_mds_request *next_req =
639                                rb_entry(p, struct ceph_mds_request, r_node);
640                        if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
641                                mdsc->oldest_tid = next_req->r_tid;
642                                break;
643                        }
644                        p = rb_next(p);
645                }
646        }
647
648        erase_request(&mdsc->request_tree, req);
649
650        if (req->r_unsafe_dir && req->r_got_unsafe) {
651                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
652                spin_lock(&ci->i_unsafe_lock);
653                list_del_init(&req->r_unsafe_dir_item);
654                spin_unlock(&ci->i_unsafe_lock);
655        }
656        if (req->r_target_inode && req->r_got_unsafe) {
657                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
658                spin_lock(&ci->i_unsafe_lock);
659                list_del_init(&req->r_unsafe_target_item);
660                spin_unlock(&ci->i_unsafe_lock);
661        }
662
663        if (req->r_unsafe_dir) {
664                iput(req->r_unsafe_dir);
665                req->r_unsafe_dir = NULL;
666        }
667
668        complete_all(&req->r_safe_completion);
669
670        ceph_mdsc_put_request(req);
671}
672
673/*
674 * Choose mds to send request to next.  If there is a hint set in the
675 * request (e.g., due to a prior forward hint from the mds), use that.
676 * Otherwise, consult frag tree and/or caps to identify the
677 * appropriate mds.  If all else fails, choose randomly.
678 *
679 * Called under mdsc->mutex.
680 */
681static struct dentry *get_nonsnap_parent(struct dentry *dentry)
682{
683        /*
684         * we don't need to worry about protecting the d_parent access
685         * here because we never renaming inside the snapped namespace
686         * except to resplice to another snapdir, and either the old or new
687         * result is a valid result.
688         */
689        while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
690                dentry = dentry->d_parent;
691        return dentry;
692}
693
694static int __choose_mds(struct ceph_mds_client *mdsc,
695                        struct ceph_mds_request *req)
696{
697        struct inode *inode;
698        struct ceph_inode_info *ci;
699        struct ceph_cap *cap;
700        int mode = req->r_direct_mode;
701        int mds = -1;
702        u32 hash = req->r_direct_hash;
703        bool is_hash = req->r_direct_is_hash;
704
705        /*
706         * is there a specific mds we should try?  ignore hint if we have
707         * no session and the mds is not up (active or recovering).
708         */
709        if (req->r_resend_mds >= 0 &&
710            (__have_session(mdsc, req->r_resend_mds) ||
711             ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
712                dout("choose_mds using resend_mds mds%d\n",
713                     req->r_resend_mds);
714                return req->r_resend_mds;
715        }
716
717        if (mode == USE_RANDOM_MDS)
718                goto random;
719
720        inode = NULL;
721        if (req->r_inode) {
722                inode = req->r_inode;
723        } else if (req->r_dentry) {
724                /* ignore race with rename; old or new d_parent is okay */
725                struct dentry *parent = req->r_dentry->d_parent;
726                struct inode *dir = d_inode(parent);
727
728                if (dir->i_sb != mdsc->fsc->sb) {
729                        /* not this fs! */
730                        inode = d_inode(req->r_dentry);
731                } else if (ceph_snap(dir) != CEPH_NOSNAP) {
732                        /* direct snapped/virtual snapdir requests
733                         * based on parent dir inode */
734                        struct dentry *dn = get_nonsnap_parent(parent);
735                        inode = d_inode(dn);
736                        dout("__choose_mds using nonsnap parent %p\n", inode);
737                } else {
738                        /* dentry target */
739                        inode = d_inode(req->r_dentry);
740                        if (!inode || mode == USE_AUTH_MDS) {
741                                /* dir + name */
742                                inode = dir;
743                                hash = ceph_dentry_hash(dir, req->r_dentry);
744                                is_hash = true;
745                        }
746                }
747        }
748
749        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
750             (int)hash, mode);
751        if (!inode)
752                goto random;
753        ci = ceph_inode(inode);
754
755        if (is_hash && S_ISDIR(inode->i_mode)) {
756                struct ceph_inode_frag frag;
757                int found;
758
759                ceph_choose_frag(ci, hash, &frag, &found);
760                if (found) {
761                        if (mode == USE_ANY_MDS && frag.ndist > 0) {
762                                u8 r;
763
764                                /* choose a random replica */
765                                get_random_bytes(&r, 1);
766                                r %= frag.ndist;
767                                mds = frag.dist[r];
768                                dout("choose_mds %p %llx.%llx "
769                                     "frag %u mds%d (%d/%d)\n",
770                                     inode, ceph_vinop(inode),
771                                     frag.frag, mds,
772                                     (int)r, frag.ndist);
773                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
774                                    CEPH_MDS_STATE_ACTIVE)
775                                        return mds;
776                        }
777
778                        /* since this file/dir wasn't known to be
779                         * replicated, then we want to look for the
780                         * authoritative mds. */
781                        mode = USE_AUTH_MDS;
782                        if (frag.mds >= 0) {
783                                /* choose auth mds */
784                                mds = frag.mds;
785                                dout("choose_mds %p %llx.%llx "
786                                     "frag %u mds%d (auth)\n",
787                                     inode, ceph_vinop(inode), frag.frag, mds);
788                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
789                                    CEPH_MDS_STATE_ACTIVE)
790                                        return mds;
791                        }
792                }
793        }
794
795        spin_lock(&ci->i_ceph_lock);
796        cap = NULL;
797        if (mode == USE_AUTH_MDS)
798                cap = ci->i_auth_cap;
799        if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
800                cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
801        if (!cap) {
802                spin_unlock(&ci->i_ceph_lock);
803                goto random;
804        }
805        mds = cap->session->s_mds;
806        dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
807             inode, ceph_vinop(inode), mds,
808             cap == ci->i_auth_cap ? "auth " : "", cap);
809        spin_unlock(&ci->i_ceph_lock);
810        return mds;
811
812random:
813        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
814        dout("choose_mds chose random mds%d\n", mds);
815        return mds;
816}
817
818
819/*
820 * session messages
821 */
822static struct ceph_msg *create_session_msg(u32 op, u64 seq)
823{
824        struct ceph_msg *msg;
825        struct ceph_mds_session_head *h;
826
827        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
828                           false);
829        if (!msg) {
830                pr_err("create_session_msg ENOMEM creating msg\n");
831                return NULL;
832        }
833        h = msg->front.iov_base;
834        h->op = cpu_to_le32(op);
835        h->seq = cpu_to_le64(seq);
836
837        return msg;
838}
839
840/*
841 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
842 * to include additional client metadata fields.
843 */
844static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
845{
846        struct ceph_msg *msg;
847        struct ceph_mds_session_head *h;
848        int i = -1;
849        int metadata_bytes = 0;
850        int metadata_key_count = 0;
851        struct ceph_options *opt = mdsc->fsc->client->options;
852        struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
853        void *p;
854
855        const char* metadata[][2] = {
856                {"hostname", utsname()->nodename},
857                {"kernel_version", utsname()->release},
858                {"entity_id", opt->name ? : ""},
859                {"root", fsopt->server_path ? : "/"},
860                {NULL, NULL}
861        };
862
863        /* Calculate serialized length of metadata */
864        metadata_bytes = 4;  /* map length */
865        for (i = 0; metadata[i][0] != NULL; ++i) {
866                metadata_bytes += 8 + strlen(metadata[i][0]) +
867                        strlen(metadata[i][1]);
868                metadata_key_count++;
869        }
870
871        /* Allocate the message */
872        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
873                           GFP_NOFS, false);
874        if (!msg) {
875                pr_err("create_session_msg ENOMEM creating msg\n");
876                return NULL;
877        }
878        h = msg->front.iov_base;
879        h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
880        h->seq = cpu_to_le64(seq);
881
882        /*
883         * Serialize client metadata into waiting buffer space, using
884         * the format that userspace expects for map<string, string>
885         *
886         * ClientSession messages with metadata are v2
887         */
888        msg->hdr.version = cpu_to_le16(2);
889        msg->hdr.compat_version = cpu_to_le16(1);
890
891        /* The write pointer, following the session_head structure */
892        p = msg->front.iov_base + sizeof(*h);
893
894        /* Number of entries in the map */
895        ceph_encode_32(&p, metadata_key_count);
896
897        /* Two length-prefixed strings for each entry in the map */
898        for (i = 0; metadata[i][0] != NULL; ++i) {
899                size_t const key_len = strlen(metadata[i][0]);
900                size_t const val_len = strlen(metadata[i][1]);
901
902                ceph_encode_32(&p, key_len);
903                memcpy(p, metadata[i][0], key_len);
904                p += key_len;
905                ceph_encode_32(&p, val_len);
906                memcpy(p, metadata[i][1], val_len);
907                p += val_len;
908        }
909
910        return msg;
911}
912
913/*
914 * send session open request.
915 *
916 * called under mdsc->mutex
917 */
918static int __open_session(struct ceph_mds_client *mdsc,
919                          struct ceph_mds_session *session)
920{
921        struct ceph_msg *msg;
922        int mstate;
923        int mds = session->s_mds;
924
925        /* wait for mds to go active? */
926        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
927        dout("open_session to mds%d (%s)\n", mds,
928             ceph_mds_state_name(mstate));
929        session->s_state = CEPH_MDS_SESSION_OPENING;
930        session->s_renew_requested = jiffies;
931
932        /* send connect message */
933        msg = create_session_open_msg(mdsc, session->s_seq);
934        if (!msg)
935                return -ENOMEM;
936        ceph_con_send(&session->s_con, msg);
937        return 0;
938}
939
940/*
941 * open sessions for any export targets for the given mds
942 *
943 * called under mdsc->mutex
944 */
945static struct ceph_mds_session *
946__open_export_target_session(struct ceph_mds_client *mdsc, int target)
947{
948        struct ceph_mds_session *session;
949
950        session = __ceph_lookup_mds_session(mdsc, target);
951        if (!session) {
952                session = register_session(mdsc, target);
953                if (IS_ERR(session))
954                        return session;
955        }
956        if (session->s_state == CEPH_MDS_SESSION_NEW ||
957            session->s_state == CEPH_MDS_SESSION_CLOSING)
958                __open_session(mdsc, session);
959
960        return session;
961}
962
963struct ceph_mds_session *
964ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
965{
966        struct ceph_mds_session *session;
967
968        dout("open_export_target_session to mds%d\n", target);
969
970        mutex_lock(&mdsc->mutex);
971        session = __open_export_target_session(mdsc, target);
972        mutex_unlock(&mdsc->mutex);
973
974        return session;
975}
976
977static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
978                                          struct ceph_mds_session *session)
979{
980        struct ceph_mds_info *mi;
981        struct ceph_mds_session *ts;
982        int i, mds = session->s_mds;
983
984        if (mds >= mdsc->mdsmap->m_max_mds)
985                return;
986
987        mi = &mdsc->mdsmap->m_info[mds];
988        dout("open_export_target_sessions for mds%d (%d targets)\n",
989             session->s_mds, mi->num_export_targets);
990
991        for (i = 0; i < mi->num_export_targets; i++) {
992                ts = __open_export_target_session(mdsc, mi->export_targets[i]);
993                if (!IS_ERR(ts))
994                        ceph_put_mds_session(ts);
995        }
996}
997
998void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
999                                           struct ceph_mds_session *session)
1000{
1001        mutex_lock(&mdsc->mutex);
1002        __open_export_target_sessions(mdsc, session);
1003        mutex_unlock(&mdsc->mutex);
1004}
1005
1006/*
1007 * session caps
1008 */
1009
1010/* caller holds s_cap_lock, we drop it */
1011static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
1012                                 struct ceph_mds_session *session)
1013        __releases(session->s_cap_lock)
1014{
1015        LIST_HEAD(tmp_list);
1016        list_splice_init(&session->s_cap_releases, &tmp_list);
1017        session->s_num_cap_releases = 0;
1018        spin_unlock(&session->s_cap_lock);
1019
1020        dout("cleanup_cap_releases mds%d\n", session->s_mds);
1021        while (!list_empty(&tmp_list)) {
1022                struct ceph_cap *cap;
1023                /* zero out the in-progress message */
1024                cap = list_first_entry(&tmp_list,
1025                                        struct ceph_cap, session_caps);
1026                list_del(&cap->session_caps);
1027                ceph_put_cap(mdsc, cap);
1028        }
1029}
1030
1031static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1032                                     struct ceph_mds_session *session)
1033{
1034        struct ceph_mds_request *req;
1035        struct rb_node *p;
1036
1037        dout("cleanup_session_requests mds%d\n", session->s_mds);
1038        mutex_lock(&mdsc->mutex);
1039        while (!list_empty(&session->s_unsafe)) {
1040                req = list_first_entry(&session->s_unsafe,
1041                                       struct ceph_mds_request, r_unsafe_item);
1042                pr_warn_ratelimited(" dropping unsafe request %llu\n",
1043                                    req->r_tid);
1044                __unregister_request(mdsc, req);
1045        }
1046        /* zero r_attempts, so kick_requests() will re-send requests */
1047        p = rb_first(&mdsc->request_tree);
1048        while (p) {
1049                req = rb_entry(p, struct ceph_mds_request, r_node);
1050                p = rb_next(p);
1051                if (req->r_session &&
1052                    req->r_session->s_mds == session->s_mds)
1053                        req->r_attempts = 0;
1054        }
1055        mutex_unlock(&mdsc->mutex);
1056}
1057
1058/*
1059 * Helper to safely iterate over all caps associated with a session, with
1060 * special care taken to handle a racing __ceph_remove_cap().
1061 *
1062 * Caller must hold session s_mutex.
1063 */
1064static int iterate_session_caps(struct ceph_mds_session *session,
1065                                 int (*cb)(struct inode *, struct ceph_cap *,
1066                                            void *), void *arg)
1067{
1068        struct list_head *p;
1069        struct ceph_cap *cap;
1070        struct inode *inode, *last_inode = NULL;
1071        struct ceph_cap *old_cap = NULL;
1072        int ret;
1073
1074        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1075        spin_lock(&session->s_cap_lock);
1076        p = session->s_caps.next;
1077        while (p != &session->s_caps) {
1078                cap = list_entry(p, struct ceph_cap, session_caps);
1079                inode = igrab(&cap->ci->vfs_inode);
1080                if (!inode) {
1081                        p = p->next;
1082                        continue;
1083                }
1084                session->s_cap_iterator = cap;
1085                spin_unlock(&session->s_cap_lock);
1086
1087                if (last_inode) {
1088                        iput(last_inode);
1089                        last_inode = NULL;
1090                }
1091                if (old_cap) {
1092                        ceph_put_cap(session->s_mdsc, old_cap);
1093                        old_cap = NULL;
1094                }
1095
1096                ret = cb(inode, cap, arg);
1097                last_inode = inode;
1098
1099                spin_lock(&session->s_cap_lock);
1100                p = p->next;
1101                if (cap->ci == NULL) {
1102                        dout("iterate_session_caps  finishing cap %p removal\n",
1103                             cap);
1104                        BUG_ON(cap->session != session);
1105                        cap->session = NULL;
1106                        list_del_init(&cap->session_caps);
1107                        session->s_nr_caps--;
1108                        if (cap->queue_release) {
1109                                list_add_tail(&cap->session_caps,
1110                                              &session->s_cap_releases);
1111                                session->s_num_cap_releases++;
1112                        } else {
1113                                old_cap = cap;  /* put_cap it w/o locks held */
1114                        }
1115                }
1116                if (ret < 0)
1117                        goto out;
1118        }
1119        ret = 0;
1120out:
1121        session->s_cap_iterator = NULL;
1122        spin_unlock(&session->s_cap_lock);
1123
1124        iput(last_inode);
1125        if (old_cap)
1126                ceph_put_cap(session->s_mdsc, old_cap);
1127
1128        return ret;
1129}
1130
1131static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1132                                  void *arg)
1133{
1134        struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1135        struct ceph_inode_info *ci = ceph_inode(inode);
1136        LIST_HEAD(to_remove);
1137        bool drop = false;
1138        bool invalidate = false;
1139
1140        dout("removing cap %p, ci is %p, inode is %p\n",
1141             cap, ci, &ci->vfs_inode);
1142        spin_lock(&ci->i_ceph_lock);
1143        __ceph_remove_cap(cap, false);
1144        if (!ci->i_auth_cap) {
1145                struct ceph_cap_flush *cf;
1146                struct ceph_mds_client *mdsc = fsc->mdsc;
1147
1148                ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1149
1150                if (ci->i_wrbuffer_ref > 0 &&
1151                    ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1152                        invalidate = true;
1153
1154                while (!list_empty(&ci->i_cap_flush_list)) {
1155                        cf = list_first_entry(&ci->i_cap_flush_list,
1156                                              struct ceph_cap_flush, i_list);
1157                        list_move(&cf->i_list, &to_remove);
1158                }
1159
1160                spin_lock(&mdsc->cap_dirty_lock);
1161
1162                list_for_each_entry(cf, &to_remove, i_list)
1163                        list_del(&cf->g_list);
1164
1165                if (!list_empty(&ci->i_dirty_item)) {
1166                        pr_warn_ratelimited(
1167                                " dropping dirty %s state for %p %lld\n",
1168                                ceph_cap_string(ci->i_dirty_caps),
1169                                inode, ceph_ino(inode));
1170                        ci->i_dirty_caps = 0;
1171                        list_del_init(&ci->i_dirty_item);
1172                        drop = true;
1173                }
1174                if (!list_empty(&ci->i_flushing_item)) {
1175                        pr_warn_ratelimited(
1176                                " dropping dirty+flushing %s state for %p %lld\n",
1177                                ceph_cap_string(ci->i_flushing_caps),
1178                                inode, ceph_ino(inode));
1179                        ci->i_flushing_caps = 0;
1180                        list_del_init(&ci->i_flushing_item);
1181                        mdsc->num_cap_flushing--;
1182                        drop = true;
1183                }
1184                spin_unlock(&mdsc->cap_dirty_lock);
1185
1186                if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1187                        list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1188                        ci->i_prealloc_cap_flush = NULL;
1189                }
1190        }
1191        spin_unlock(&ci->i_ceph_lock);
1192        while (!list_empty(&to_remove)) {
1193                struct ceph_cap_flush *cf;
1194                cf = list_first_entry(&to_remove,
1195                                      struct ceph_cap_flush, i_list);
1196                list_del(&cf->i_list);
1197                ceph_free_cap_flush(cf);
1198        }
1199
1200        wake_up_all(&ci->i_cap_wq);
1201        if (invalidate)
1202                ceph_queue_invalidate(inode);
1203        if (drop)
1204                iput(inode);
1205        return 0;
1206}
1207
1208/*
1209 * caller must hold session s_mutex
1210 */
1211static void remove_session_caps(struct ceph_mds_session *session)
1212{
1213        struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1214        struct super_block *sb = fsc->sb;
1215        dout("remove_session_caps on %p\n", session);
1216        iterate_session_caps(session, remove_session_caps_cb, fsc);
1217
1218        wake_up_all(&fsc->mdsc->cap_flushing_wq);
1219
1220        spin_lock(&session->s_cap_lock);
1221        if (session->s_nr_caps > 0) {
1222                struct inode *inode;
1223                struct ceph_cap *cap, *prev = NULL;
1224                struct ceph_vino vino;
1225                /*
1226                 * iterate_session_caps() skips inodes that are being
1227                 * deleted, we need to wait until deletions are complete.
1228                 * __wait_on_freeing_inode() is designed for the job,
1229                 * but it is not exported, so use lookup inode function
1230                 * to access it.
1231                 */
1232                while (!list_empty(&session->s_caps)) {
1233                        cap = list_entry(session->s_caps.next,
1234                                         struct ceph_cap, session_caps);
1235                        if (cap == prev)
1236                                break;
1237                        prev = cap;
1238                        vino = cap->ci->i_vino;
1239                        spin_unlock(&session->s_cap_lock);
1240
1241                        inode = ceph_find_inode(sb, vino);
1242                        iput(inode);
1243
1244                        spin_lock(&session->s_cap_lock);
1245                }
1246        }
1247
1248        // drop cap expires and unlock s_cap_lock
1249        cleanup_cap_releases(session->s_mdsc, session);
1250
1251        BUG_ON(session->s_nr_caps > 0);
1252        BUG_ON(!list_empty(&session->s_cap_flushing));
1253}
1254
1255/*
1256 * wake up any threads waiting on this session's caps.  if the cap is
1257 * old (didn't get renewed on the client reconnect), remove it now.
1258 *
1259 * caller must hold s_mutex.
1260 */
1261static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1262                              void *arg)
1263{
1264        struct ceph_inode_info *ci = ceph_inode(inode);
1265
1266        if (arg) {
1267                spin_lock(&ci->i_ceph_lock);
1268                ci->i_wanted_max_size = 0;
1269                ci->i_requested_max_size = 0;
1270                spin_unlock(&ci->i_ceph_lock);
1271        }
1272        wake_up_all(&ci->i_cap_wq);
1273        return 0;
1274}
1275
1276static void wake_up_session_caps(struct ceph_mds_session *session,
1277                                 int reconnect)
1278{
1279        dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1280        iterate_session_caps(session, wake_up_session_cb,
1281                             (void *)(unsigned long)reconnect);
1282}
1283
1284/*
1285 * Send periodic message to MDS renewing all currently held caps.  The
1286 * ack will reset the expiration for all caps from this session.
1287 *
1288 * caller holds s_mutex
1289 */
1290static int send_renew_caps(struct ceph_mds_client *mdsc,
1291                           struct ceph_mds_session *session)
1292{
1293        struct ceph_msg *msg;
1294        int state;
1295
1296        if (time_after_eq(jiffies, session->s_cap_ttl) &&
1297            time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1298                pr_info("mds%d caps stale\n", session->s_mds);
1299        session->s_renew_requested = jiffies;
1300
1301        /* do not try to renew caps until a recovering mds has reconnected
1302         * with its clients. */
1303        state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1304        if (state < CEPH_MDS_STATE_RECONNECT) {
1305                dout("send_renew_caps ignoring mds%d (%s)\n",
1306                     session->s_mds, ceph_mds_state_name(state));
1307                return 0;
1308        }
1309
1310        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1311                ceph_mds_state_name(state));
1312        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1313                                 ++session->s_renew_seq);
1314        if (!msg)
1315                return -ENOMEM;
1316        ceph_con_send(&session->s_con, msg);
1317        return 0;
1318}
1319
1320static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1321                             struct ceph_mds_session *session, u64 seq)
1322{
1323        struct ceph_msg *msg;
1324
1325        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1326             session->s_mds, ceph_session_state_name(session->s_state), seq);
1327        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1328        if (!msg)
1329                return -ENOMEM;
1330        ceph_con_send(&session->s_con, msg);
1331        return 0;
1332}
1333
1334
1335/*
1336 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1337 *
1338 * Called under session->s_mutex
1339 */
1340static void renewed_caps(struct ceph_mds_client *mdsc,
1341                         struct ceph_mds_session *session, int is_renew)
1342{
1343        int was_stale;
1344        int wake = 0;
1345
1346        spin_lock(&session->s_cap_lock);
1347        was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1348
1349        session->s_cap_ttl = session->s_renew_requested +
1350                mdsc->mdsmap->m_session_timeout*HZ;
1351
1352        if (was_stale) {
1353                if (time_before(jiffies, session->s_cap_ttl)) {
1354                        pr_info("mds%d caps renewed\n", session->s_mds);
1355                        wake = 1;
1356                } else {
1357                        pr_info("mds%d caps still stale\n", session->s_mds);
1358                }
1359        }
1360        dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1361             session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1362             time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1363        spin_unlock(&session->s_cap_lock);
1364
1365        if (wake)
1366                wake_up_session_caps(session, 0);
1367}
1368
1369/*
1370 * send a session close request
1371 */
1372static int request_close_session(struct ceph_mds_client *mdsc,
1373                                 struct ceph_mds_session *session)
1374{
1375        struct ceph_msg *msg;
1376
1377        dout("request_close_session mds%d state %s seq %lld\n",
1378             session->s_mds, ceph_session_state_name(session->s_state),
1379             session->s_seq);
1380        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1381        if (!msg)
1382                return -ENOMEM;
1383        ceph_con_send(&session->s_con, msg);
1384        return 1;
1385}
1386
1387/*
1388 * Called with s_mutex held.
1389 */
1390static int __close_session(struct ceph_mds_client *mdsc,
1391                         struct ceph_mds_session *session)
1392{
1393        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1394                return 0;
1395        session->s_state = CEPH_MDS_SESSION_CLOSING;
1396        return request_close_session(mdsc, session);
1397}
1398
1399/*
1400 * Trim old(er) caps.
1401 *
1402 * Because we can't cache an inode without one or more caps, we do
1403 * this indirectly: if a cap is unused, we prune its aliases, at which
1404 * point the inode will hopefully get dropped to.
1405 *
1406 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1407 * memory pressure from the MDS, though, so it needn't be perfect.
1408 */
1409static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1410{
1411        struct ceph_mds_session *session = arg;
1412        struct ceph_inode_info *ci = ceph_inode(inode);
1413        int used, wanted, oissued, mine;
1414
1415        if (session->s_trim_caps <= 0)
1416                return -1;
1417
1418        spin_lock(&ci->i_ceph_lock);
1419        mine = cap->issued | cap->implemented;
1420        used = __ceph_caps_used(ci);
1421        wanted = __ceph_caps_file_wanted(ci);
1422        oissued = __ceph_caps_issued_other(ci, cap);
1423
1424        dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1425             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1426             ceph_cap_string(used), ceph_cap_string(wanted));
1427        if (cap == ci->i_auth_cap) {
1428                if (ci->i_dirty_caps || ci->i_flushing_caps ||
1429                    !list_empty(&ci->i_cap_snaps))
1430                        goto out;
1431                if ((used | wanted) & CEPH_CAP_ANY_WR)
1432                        goto out;
1433        }
1434        /* The inode has cached pages, but it's no longer used.
1435         * we can safely drop it */
1436        if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1437            !(oissued & CEPH_CAP_FILE_CACHE)) {
1438          used = 0;
1439          oissued = 0;
1440        }
1441        if ((used | wanted) & ~oissued & mine)
1442                goto out;   /* we need these caps */
1443
1444        session->s_trim_caps--;
1445        if (oissued) {
1446                /* we aren't the only cap.. just remove us */
1447                __ceph_remove_cap(cap, true);
1448        } else {
1449                /* try dropping referring dentries */
1450                spin_unlock(&ci->i_ceph_lock);
1451                d_prune_aliases(inode);
1452                dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1453                     inode, cap, atomic_read(&inode->i_count));
1454                return 0;
1455        }
1456
1457out:
1458        spin_unlock(&ci->i_ceph_lock);
1459        return 0;
1460}
1461
1462/*
1463 * Trim session cap count down to some max number.
1464 */
1465static int trim_caps(struct ceph_mds_client *mdsc,
1466                     struct ceph_mds_session *session,
1467                     int max_caps)
1468{
1469        int trim_caps = session->s_nr_caps - max_caps;
1470
1471        dout("trim_caps mds%d start: %d / %d, trim %d\n",
1472             session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1473        if (trim_caps > 0) {
1474                session->s_trim_caps = trim_caps;
1475                iterate_session_caps(session, trim_caps_cb, session);
1476                dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1477                     session->s_mds, session->s_nr_caps, max_caps,
1478                        trim_caps - session->s_trim_caps);
1479                session->s_trim_caps = 0;
1480        }
1481
1482        ceph_send_cap_releases(mdsc, session);
1483        return 0;
1484}
1485
1486static int check_caps_flush(struct ceph_mds_client *mdsc,
1487                            u64 want_flush_tid)
1488{
1489        int ret = 1;
1490
1491        spin_lock(&mdsc->cap_dirty_lock);
1492        if (!list_empty(&mdsc->cap_flush_list)) {
1493                struct ceph_cap_flush *cf =
1494                        list_first_entry(&mdsc->cap_flush_list,
1495                                         struct ceph_cap_flush, g_list);
1496                if (cf->tid <= want_flush_tid) {
1497                        dout("check_caps_flush still flushing tid "
1498                             "%llu <= %llu\n", cf->tid, want_flush_tid);
1499                        ret = 0;
1500                }
1501        }
1502        spin_unlock(&mdsc->cap_dirty_lock);
1503        return ret;
1504}
1505
1506/*
1507 * flush all dirty inode data to disk.
1508 *
1509 * returns true if we've flushed through want_flush_tid
1510 */
1511static void wait_caps_flush(struct ceph_mds_client *mdsc,
1512                            u64 want_flush_tid)
1513{
1514        dout("check_caps_flush want %llu\n", want_flush_tid);
1515
1516        wait_event(mdsc->cap_flushing_wq,
1517                   check_caps_flush(mdsc, want_flush_tid));
1518
1519        dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1520}
1521
1522/*
1523 * called under s_mutex
1524 */
1525void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1526                            struct ceph_mds_session *session)
1527{
1528        struct ceph_msg *msg = NULL;
1529        struct ceph_mds_cap_release *head;
1530        struct ceph_mds_cap_item *item;
1531        struct ceph_cap *cap;
1532        LIST_HEAD(tmp_list);
1533        int num_cap_releases;
1534
1535        spin_lock(&session->s_cap_lock);
1536again:
1537        list_splice_init(&session->s_cap_releases, &tmp_list);
1538        num_cap_releases = session->s_num_cap_releases;
1539        session->s_num_cap_releases = 0;
1540        spin_unlock(&session->s_cap_lock);
1541
1542        while (!list_empty(&tmp_list)) {
1543                if (!msg) {
1544                        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1545                                        PAGE_SIZE, GFP_NOFS, false);
1546                        if (!msg)
1547                                goto out_err;
1548                        head = msg->front.iov_base;
1549                        head->num = cpu_to_le32(0);
1550                        msg->front.iov_len = sizeof(*head);
1551                }
1552                cap = list_first_entry(&tmp_list, struct ceph_cap,
1553                                        session_caps);
1554                list_del(&cap->session_caps);
1555                num_cap_releases--;
1556
1557                head = msg->front.iov_base;
1558                le32_add_cpu(&head->num, 1);
1559                item = msg->front.iov_base + msg->front.iov_len;
1560                item->ino = cpu_to_le64(cap->cap_ino);
1561                item->cap_id = cpu_to_le64(cap->cap_id);
1562                item->migrate_seq = cpu_to_le32(cap->mseq);
1563                item->seq = cpu_to_le32(cap->issue_seq);
1564                msg->front.iov_len += sizeof(*item);
1565
1566                ceph_put_cap(mdsc, cap);
1567
1568                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1569                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1570                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1571                        ceph_con_send(&session->s_con, msg);
1572                        msg = NULL;
1573                }
1574        }
1575
1576        BUG_ON(num_cap_releases != 0);
1577
1578        spin_lock(&session->s_cap_lock);
1579        if (!list_empty(&session->s_cap_releases))
1580                goto again;
1581        spin_unlock(&session->s_cap_lock);
1582
1583        if (msg) {
1584                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1585                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1586                ceph_con_send(&session->s_con, msg);
1587        }
1588        return;
1589out_err:
1590        pr_err("send_cap_releases mds%d, failed to allocate message\n",
1591                session->s_mds);
1592        spin_lock(&session->s_cap_lock);
1593        list_splice(&tmp_list, &session->s_cap_releases);
1594        session->s_num_cap_releases += num_cap_releases;
1595        spin_unlock(&session->s_cap_lock);
1596}
1597
1598/*
1599 * requests
1600 */
1601
1602int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1603                                    struct inode *dir)
1604{
1605        struct ceph_inode_info *ci = ceph_inode(dir);
1606        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1607        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1608        size_t size = sizeof(struct ceph_mds_reply_dir_entry);
1609        int order, num_entries;
1610
1611        spin_lock(&ci->i_ceph_lock);
1612        num_entries = ci->i_files + ci->i_subdirs;
1613        spin_unlock(&ci->i_ceph_lock);
1614        num_entries = max(num_entries, 1);
1615        num_entries = min(num_entries, opt->max_readdir);
1616
1617        order = get_order(size * num_entries);
1618        while (order >= 0) {
1619                rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
1620                                                             __GFP_NOWARN,
1621                                                             order);
1622                if (rinfo->dir_entries)
1623                        break;
1624                order--;
1625        }
1626        if (!rinfo->dir_entries)
1627                return -ENOMEM;
1628
1629        num_entries = (PAGE_SIZE << order) / size;
1630        num_entries = min(num_entries, opt->max_readdir);
1631
1632        rinfo->dir_buf_size = PAGE_SIZE << order;
1633        req->r_num_caps = num_entries + 1;
1634        req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1635        req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1636        return 0;
1637}
1638
1639/*
1640 * Create an mds request.
1641 */
1642struct ceph_mds_request *
1643ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1644{
1645        struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1646
1647        if (!req)
1648                return ERR_PTR(-ENOMEM);
1649
1650        mutex_init(&req->r_fill_mutex);
1651        req->r_mdsc = mdsc;
1652        req->r_started = jiffies;
1653        req->r_resend_mds = -1;
1654        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1655        INIT_LIST_HEAD(&req->r_unsafe_target_item);
1656        req->r_fmode = -1;
1657        kref_init(&req->r_kref);
1658        RB_CLEAR_NODE(&req->r_node);
1659        INIT_LIST_HEAD(&req->r_wait);
1660        init_completion(&req->r_completion);
1661        init_completion(&req->r_safe_completion);
1662        INIT_LIST_HEAD(&req->r_unsafe_item);
1663
1664        req->r_stamp = current_fs_time(mdsc->fsc->sb);
1665
1666        req->r_op = op;
1667        req->r_direct_mode = mode;
1668        return req;
1669}
1670
1671/*
1672 * return oldest (lowest) request, tid in request tree, 0 if none.
1673 *
1674 * called under mdsc->mutex.
1675 */
1676static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1677{
1678        if (RB_EMPTY_ROOT(&mdsc->request_tree))
1679                return NULL;
1680        return rb_entry(rb_first(&mdsc->request_tree),
1681                        struct ceph_mds_request, r_node);
1682}
1683
1684static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1685{
1686        return mdsc->oldest_tid;
1687}
1688
1689/*
1690 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1691 * on build_path_from_dentry in fs/cifs/dir.c.
1692 *
1693 * If @stop_on_nosnap, generate path relative to the first non-snapped
1694 * inode.
1695 *
1696 * Encode hidden .snap dirs as a double /, i.e.
1697 *   foo/.snap/bar -> foo//bar
1698 */
1699char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1700                           int stop_on_nosnap)
1701{
1702        struct dentry *temp;
1703        char *path;
1704        int len, pos;
1705        unsigned seq;
1706
1707        if (dentry == NULL)
1708                return ERR_PTR(-EINVAL);
1709
1710retry:
1711        len = 0;
1712        seq = read_seqbegin(&rename_lock);
1713        rcu_read_lock();
1714        for (temp = dentry; !IS_ROOT(temp);) {
1715                struct inode *inode = d_inode(temp);
1716                if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1717                        len++;  /* slash only */
1718                else if (stop_on_nosnap && inode &&
1719                         ceph_snap(inode) == CEPH_NOSNAP)
1720                        break;
1721                else
1722                        len += 1 + temp->d_name.len;
1723                temp = temp->d_parent;
1724        }
1725        rcu_read_unlock();
1726        if (len)
1727                len--;  /* no leading '/' */
1728
1729        path = kmalloc(len+1, GFP_NOFS);
1730        if (path == NULL)
1731                return ERR_PTR(-ENOMEM);
1732        pos = len;
1733        path[pos] = 0;  /* trailing null */
1734        rcu_read_lock();
1735        for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1736                struct inode *inode;
1737
1738                spin_lock(&temp->d_lock);
1739                inode = d_inode(temp);
1740                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1741                        dout("build_path path+%d: %p SNAPDIR\n",
1742                             pos, temp);
1743                } else if (stop_on_nosnap && inode &&
1744                           ceph_snap(inode) == CEPH_NOSNAP) {
1745                        spin_unlock(&temp->d_lock);
1746                        break;
1747                } else {
1748                        pos -= temp->d_name.len;
1749                        if (pos < 0) {
1750                                spin_unlock(&temp->d_lock);
1751                                break;
1752                        }
1753                        strncpy(path + pos, temp->d_name.name,
1754                                temp->d_name.len);
1755                }
1756                spin_unlock(&temp->d_lock);
1757                if (pos)
1758                        path[--pos] = '/';
1759                temp = temp->d_parent;
1760        }
1761        rcu_read_unlock();
1762        if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1763                pr_err("build_path did not end path lookup where "
1764                       "expected, namelen is %d, pos is %d\n", len, pos);
1765                /* presumably this is only possible if racing with a
1766                   rename of one of the parent directories (we can not
1767                   lock the dentries above us to prevent this, but
1768                   retrying should be harmless) */
1769                kfree(path);
1770                goto retry;
1771        }
1772
1773        *base = ceph_ino(d_inode(temp));
1774        *plen = len;
1775        dout("build_path on %p %d built %llx '%.*s'\n",
1776             dentry, d_count(dentry), *base, len, path);
1777        return path;
1778}
1779
1780static int build_dentry_path(struct dentry *dentry,
1781                             const char **ppath, int *ppathlen, u64 *pino,
1782                             int *pfreepath)
1783{
1784        char *path;
1785
1786        if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) {
1787                *pino = ceph_ino(d_inode(dentry->d_parent));
1788                *ppath = dentry->d_name.name;
1789                *ppathlen = dentry->d_name.len;
1790                return 0;
1791        }
1792        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1793        if (IS_ERR(path))
1794                return PTR_ERR(path);
1795        *ppath = path;
1796        *pfreepath = 1;
1797        return 0;
1798}
1799
1800static int build_inode_path(struct inode *inode,
1801                            const char **ppath, int *ppathlen, u64 *pino,
1802                            int *pfreepath)
1803{
1804        struct dentry *dentry;
1805        char *path;
1806
1807        if (ceph_snap(inode) == CEPH_NOSNAP) {
1808                *pino = ceph_ino(inode);
1809                *ppathlen = 0;
1810                return 0;
1811        }
1812        dentry = d_find_alias(inode);
1813        path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1814        dput(dentry);
1815        if (IS_ERR(path))
1816                return PTR_ERR(path);
1817        *ppath = path;
1818        *pfreepath = 1;
1819        return 0;
1820}
1821
1822/*
1823 * request arguments may be specified via an inode *, a dentry *, or
1824 * an explicit ino+path.
1825 */
1826static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1827                                  const char *rpath, u64 rino,
1828                                  const char **ppath, int *pathlen,
1829                                  u64 *ino, int *freepath)
1830{
1831        int r = 0;
1832
1833        if (rinode) {
1834                r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1835                dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1836                     ceph_snap(rinode));
1837        } else if (rdentry) {
1838                r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1839                dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1840                     *ppath);
1841        } else if (rpath || rino) {
1842                *ino = rino;
1843                *ppath = rpath;
1844                *pathlen = rpath ? strlen(rpath) : 0;
1845                dout(" path %.*s\n", *pathlen, rpath);
1846        }
1847
1848        return r;
1849}
1850
1851/*
1852 * called under mdsc->mutex
1853 */
1854static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1855                                               struct ceph_mds_request *req,
1856                                               int mds, bool drop_cap_releases)
1857{
1858        struct ceph_msg *msg;
1859        struct ceph_mds_request_head *head;
1860        const char *path1 = NULL;
1861        const char *path2 = NULL;
1862        u64 ino1 = 0, ino2 = 0;
1863        int pathlen1 = 0, pathlen2 = 0;
1864        int freepath1 = 0, freepath2 = 0;
1865        int len;
1866        u16 releases;
1867        void *p, *end;
1868        int ret;
1869
1870        ret = set_request_path_attr(req->r_inode, req->r_dentry,
1871                              req->r_path1, req->r_ino1.ino,
1872                              &path1, &pathlen1, &ino1, &freepath1);
1873        if (ret < 0) {
1874                msg = ERR_PTR(ret);
1875                goto out;
1876        }
1877
1878        ret = set_request_path_attr(NULL, req->r_old_dentry,
1879                              req->r_path2, req->r_ino2.ino,
1880                              &path2, &pathlen2, &ino2, &freepath2);
1881        if (ret < 0) {
1882                msg = ERR_PTR(ret);
1883                goto out_free1;
1884        }
1885
1886        len = sizeof(*head) +
1887                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1888                sizeof(struct ceph_timespec);
1889
1890        /* calculate (max) length for cap releases */
1891        len += sizeof(struct ceph_mds_request_release) *
1892                (!!req->r_inode_drop + !!req->r_dentry_drop +
1893                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1894        if (req->r_dentry_drop)
1895                len += req->r_dentry->d_name.len;
1896        if (req->r_old_dentry_drop)
1897                len += req->r_old_dentry->d_name.len;
1898
1899        msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1900        if (!msg) {
1901                msg = ERR_PTR(-ENOMEM);
1902                goto out_free2;
1903        }
1904
1905        msg->hdr.version = cpu_to_le16(2);
1906        msg->hdr.tid = cpu_to_le64(req->r_tid);
1907
1908        head = msg->front.iov_base;
1909        p = msg->front.iov_base + sizeof(*head);
1910        end = msg->front.iov_base + msg->front.iov_len;
1911
1912        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1913        head->op = cpu_to_le32(req->r_op);
1914        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1915        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1916        head->args = req->r_args;
1917
1918        ceph_encode_filepath(&p, end, ino1, path1);
1919        ceph_encode_filepath(&p, end, ino2, path2);
1920
1921        /* make note of release offset, in case we need to replay */
1922        req->r_request_release_offset = p - msg->front.iov_base;
1923
1924        /* cap releases */
1925        releases = 0;
1926        if (req->r_inode_drop)
1927                releases += ceph_encode_inode_release(&p,
1928                      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
1929                      mds, req->r_inode_drop, req->r_inode_unless, 0);
1930        if (req->r_dentry_drop)
1931                releases += ceph_encode_dentry_release(&p, req->r_dentry,
1932                       mds, req->r_dentry_drop, req->r_dentry_unless);
1933        if (req->r_old_dentry_drop)
1934                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1935                       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1936        if (req->r_old_inode_drop)
1937                releases += ceph_encode_inode_release(&p,
1938                      d_inode(req->r_old_dentry),
1939                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1940
1941        if (drop_cap_releases) {
1942                releases = 0;
1943                p = msg->front.iov_base + req->r_request_release_offset;
1944        }
1945
1946        head->num_releases = cpu_to_le16(releases);
1947
1948        /* time stamp */
1949        {
1950                struct ceph_timespec ts;
1951                ceph_encode_timespec(&ts, &req->r_stamp);
1952                ceph_encode_copy(&p, &ts, sizeof(ts));
1953        }
1954
1955        BUG_ON(p > end);
1956        msg->front.iov_len = p - msg->front.iov_base;
1957        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1958
1959        if (req->r_pagelist) {
1960                struct ceph_pagelist *pagelist = req->r_pagelist;
1961                atomic_inc(&pagelist->refcnt);
1962                ceph_msg_data_add_pagelist(msg, pagelist);
1963                msg->hdr.data_len = cpu_to_le32(pagelist->length);
1964        } else {
1965                msg->hdr.data_len = 0;
1966        }
1967
1968        msg->hdr.data_off = cpu_to_le16(0);
1969
1970out_free2:
1971        if (freepath2)
1972                kfree((char *)path2);
1973out_free1:
1974        if (freepath1)
1975                kfree((char *)path1);
1976out:
1977        return msg;
1978}
1979
1980/*
1981 * called under mdsc->mutex if error, under no mutex if
1982 * success.
1983 */
1984static void complete_request(struct ceph_mds_client *mdsc,
1985                             struct ceph_mds_request *req)
1986{
1987        if (req->r_callback)
1988                req->r_callback(mdsc, req);
1989        else
1990                complete_all(&req->r_completion);
1991}
1992
1993/*
1994 * called under mdsc->mutex
1995 */
1996static int __prepare_send_request(struct ceph_mds_client *mdsc,
1997                                  struct ceph_mds_request *req,
1998                                  int mds, bool drop_cap_releases)
1999{
2000        struct ceph_mds_request_head *rhead;
2001        struct ceph_msg *msg;
2002        int flags = 0;
2003
2004        req->r_attempts++;
2005        if (req->r_inode) {
2006                struct ceph_cap *cap =
2007                        ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2008
2009                if (cap)
2010                        req->r_sent_on_mseq = cap->mseq;
2011                else
2012                        req->r_sent_on_mseq = -1;
2013        }
2014        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2015             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2016
2017        if (req->r_got_unsafe) {
2018                void *p;
2019                /*
2020                 * Replay.  Do not regenerate message (and rebuild
2021                 * paths, etc.); just use the original message.
2022                 * Rebuilding paths will break for renames because
2023                 * d_move mangles the src name.
2024                 */
2025                msg = req->r_request;
2026                rhead = msg->front.iov_base;
2027
2028                flags = le32_to_cpu(rhead->flags);
2029                flags |= CEPH_MDS_FLAG_REPLAY;
2030                rhead->flags = cpu_to_le32(flags);
2031
2032                if (req->r_target_inode)
2033                        rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2034
2035                rhead->num_retry = req->r_attempts - 1;
2036
2037                /* remove cap/dentry releases from message */
2038                rhead->num_releases = 0;
2039
2040                /* time stamp */
2041                p = msg->front.iov_base + req->r_request_release_offset;
2042                {
2043                        struct ceph_timespec ts;
2044                        ceph_encode_timespec(&ts, &req->r_stamp);
2045                        ceph_encode_copy(&p, &ts, sizeof(ts));
2046                }
2047
2048                msg->front.iov_len = p - msg->front.iov_base;
2049                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2050                return 0;
2051        }
2052
2053        if (req->r_request) {
2054                ceph_msg_put(req->r_request);
2055                req->r_request = NULL;
2056        }
2057        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2058        if (IS_ERR(msg)) {
2059                req->r_err = PTR_ERR(msg);
2060                return PTR_ERR(msg);
2061        }
2062        req->r_request = msg;
2063
2064        rhead = msg->front.iov_base;
2065        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2066        if (req->r_got_unsafe)
2067                flags |= CEPH_MDS_FLAG_REPLAY;
2068        if (req->r_locked_dir)
2069                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2070        rhead->flags = cpu_to_le32(flags);
2071        rhead->num_fwd = req->r_num_fwd;
2072        rhead->num_retry = req->r_attempts - 1;
2073        rhead->ino = 0;
2074
2075        dout(" r_locked_dir = %p\n", req->r_locked_dir);
2076        return 0;
2077}
2078
2079/*
2080 * send request, or put it on the appropriate wait list.
2081 */
2082static int __do_request(struct ceph_mds_client *mdsc,
2083                        struct ceph_mds_request *req)
2084{
2085        struct ceph_mds_session *session = NULL;
2086        int mds = -1;
2087        int err = 0;
2088
2089        if (req->r_err || req->r_got_result) {
2090                if (req->r_aborted)
2091                        __unregister_request(mdsc, req);
2092                goto out;
2093        }
2094
2095        if (req->r_timeout &&
2096            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2097                dout("do_request timed out\n");
2098                err = -EIO;
2099                goto finish;
2100        }
2101        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2102                dout("do_request forced umount\n");
2103                err = -EIO;
2104                goto finish;
2105        }
2106
2107        put_request_session(req);
2108
2109        mds = __choose_mds(mdsc, req);
2110        if (mds < 0 ||
2111            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2112                if (mdsc->mdsmap_err) {
2113                        err = mdsc->mdsmap_err;
2114                        dout("do_request mdsmap err %d\n", err);
2115                        goto finish;
2116                }
2117                dout("do_request no mds or not active, waiting for map\n");
2118                list_add(&req->r_wait, &mdsc->waiting_for_map);
2119                goto out;
2120        }
2121
2122        /* get, open session */
2123        session = __ceph_lookup_mds_session(mdsc, mds);
2124        if (!session) {
2125                session = register_session(mdsc, mds);
2126                if (IS_ERR(session)) {
2127                        err = PTR_ERR(session);
2128                        goto finish;
2129                }
2130        }
2131        req->r_session = get_session(session);
2132
2133        dout("do_request mds%d session %p state %s\n", mds, session,
2134             ceph_session_state_name(session->s_state));
2135        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2136            session->s_state != CEPH_MDS_SESSION_HUNG) {
2137                if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2138                        err = -EACCES;
2139                        goto out_session;
2140                }
2141                if (session->s_state == CEPH_MDS_SESSION_NEW ||
2142                    session->s_state == CEPH_MDS_SESSION_CLOSING)
2143                        __open_session(mdsc, session);
2144                list_add(&req->r_wait, &session->s_waiting);
2145                goto out_session;
2146        }
2147
2148        /* send request */
2149        req->r_resend_mds = -1;   /* forget any previous mds hint */
2150
2151        if (req->r_request_started == 0)   /* note request start time */
2152                req->r_request_started = jiffies;
2153
2154        err = __prepare_send_request(mdsc, req, mds, false);
2155        if (!err) {
2156                ceph_msg_get(req->r_request);
2157                ceph_con_send(&session->s_con, req->r_request);
2158        }
2159
2160out_session:
2161        ceph_put_mds_session(session);
2162finish:
2163        if (err) {
2164                dout("__do_request early error %d\n", err);
2165                req->r_err = err;
2166                complete_request(mdsc, req);
2167                __unregister_request(mdsc, req);
2168        }
2169out:
2170        return err;
2171}
2172
2173/*
2174 * called under mdsc->mutex
2175 */
2176static void __wake_requests(struct ceph_mds_client *mdsc,
2177                            struct list_head *head)
2178{
2179        struct ceph_mds_request *req;
2180        LIST_HEAD(tmp_list);
2181
2182        list_splice_init(head, &tmp_list);
2183
2184        while (!list_empty(&tmp_list)) {
2185                req = list_entry(tmp_list.next,
2186                                 struct ceph_mds_request, r_wait);
2187                list_del_init(&req->r_wait);
2188                dout(" wake request %p tid %llu\n", req, req->r_tid);
2189                __do_request(mdsc, req);
2190        }
2191}
2192
2193/*
2194 * Wake up threads with requests pending for @mds, so that they can
2195 * resubmit their requests to a possibly different mds.
2196 */
2197static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2198{
2199        struct ceph_mds_request *req;
2200        struct rb_node *p = rb_first(&mdsc->request_tree);
2201
2202        dout("kick_requests mds%d\n", mds);
2203        while (p) {
2204                req = rb_entry(p, struct ceph_mds_request, r_node);
2205                p = rb_next(p);
2206                if (req->r_got_unsafe)
2207                        continue;
2208                if (req->r_attempts > 0)
2209                        continue; /* only new requests */
2210                if (req->r_session &&
2211                    req->r_session->s_mds == mds) {
2212                        dout(" kicking tid %llu\n", req->r_tid);
2213                        list_del_init(&req->r_wait);
2214                        __do_request(mdsc, req);
2215                }
2216        }
2217}
2218
2219void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2220                              struct ceph_mds_request *req)
2221{
2222        dout("submit_request on %p\n", req);
2223        mutex_lock(&mdsc->mutex);
2224        __register_request(mdsc, req, NULL);
2225        __do_request(mdsc, req);
2226        mutex_unlock(&mdsc->mutex);
2227}
2228
2229/*
2230 * Synchrously perform an mds request.  Take care of all of the
2231 * session setup, forwarding, retry details.
2232 */
2233int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2234                         struct inode *dir,
2235                         struct ceph_mds_request *req)
2236{
2237        int err;
2238
2239        dout("do_request on %p\n", req);
2240
2241        /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
2242        if (req->r_inode)
2243                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2244        if (req->r_locked_dir)
2245                ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
2246        if (req->r_old_dentry_dir)
2247                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2248                                  CEPH_CAP_PIN);
2249
2250        /* issue */
2251        mutex_lock(&mdsc->mutex);
2252        __register_request(mdsc, req, dir);
2253        __do_request(mdsc, req);
2254
2255        if (req->r_err) {
2256                err = req->r_err;
2257                goto out;
2258        }
2259
2260        /* wait */
2261        mutex_unlock(&mdsc->mutex);
2262        dout("do_request waiting\n");
2263        if (!req->r_timeout && req->r_wait_for_completion) {
2264                err = req->r_wait_for_completion(mdsc, req);
2265        } else {
2266                long timeleft = wait_for_completion_killable_timeout(
2267                                        &req->r_completion,
2268                                        ceph_timeout_jiffies(req->r_timeout));
2269                if (timeleft > 0)
2270                        err = 0;
2271                else if (!timeleft)
2272                        err = -EIO;  /* timed out */
2273                else
2274                        err = timeleft;  /* killed */
2275        }
2276        dout("do_request waited, got %d\n", err);
2277        mutex_lock(&mdsc->mutex);
2278
2279        /* only abort if we didn't race with a real reply */
2280        if (req->r_got_result) {
2281                err = le32_to_cpu(req->r_reply_info.head->result);
2282        } else if (err < 0) {
2283                dout("aborted request %lld with %d\n", req->r_tid, err);
2284
2285                /*
2286                 * ensure we aren't running concurrently with
2287                 * ceph_fill_trace or ceph_readdir_prepopulate, which
2288                 * rely on locks (dir mutex) held by our caller.
2289                 */
2290                mutex_lock(&req->r_fill_mutex);
2291                req->r_err = err;
2292                req->r_aborted = true;
2293                mutex_unlock(&req->r_fill_mutex);
2294
2295                if (req->r_locked_dir &&
2296                    (req->r_op & CEPH_MDS_OP_WRITE))
2297                        ceph_invalidate_dir_request(req);
2298        } else {
2299                err = req->r_err;
2300        }
2301
2302out:
2303        mutex_unlock(&mdsc->mutex);
2304        dout("do_request %p done, result %d\n", req, err);
2305        return err;
2306}
2307
2308/*
2309 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2310 * namespace request.
2311 */
2312void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2313{
2314        struct inode *inode = req->r_locked_dir;
2315
2316        dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2317
2318        ceph_dir_clear_complete(inode);
2319        if (req->r_dentry)
2320                ceph_invalidate_dentry_lease(req->r_dentry);
2321        if (req->r_old_dentry)
2322                ceph_invalidate_dentry_lease(req->r_old_dentry);
2323}
2324
2325/*
2326 * Handle mds reply.
2327 *
2328 * We take the session mutex and parse and process the reply immediately.
2329 * This preserves the logical ordering of replies, capabilities, etc., sent
2330 * by the MDS as they are applied to our local cache.
2331 */
2332static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2333{
2334        struct ceph_mds_client *mdsc = session->s_mdsc;
2335        struct ceph_mds_request *req;
2336        struct ceph_mds_reply_head *head = msg->front.iov_base;
2337        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2338        struct ceph_snap_realm *realm;
2339        u64 tid;
2340        int err, result;
2341        int mds = session->s_mds;
2342
2343        if (msg->front.iov_len < sizeof(*head)) {
2344                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2345                ceph_msg_dump(msg);
2346                return;
2347        }
2348
2349        /* get request, session */
2350        tid = le64_to_cpu(msg->hdr.tid);
2351        mutex_lock(&mdsc->mutex);
2352        req = lookup_get_request(mdsc, tid);
2353        if (!req) {
2354                dout("handle_reply on unknown tid %llu\n", tid);
2355                mutex_unlock(&mdsc->mutex);
2356                return;
2357        }
2358        dout("handle_reply %p\n", req);
2359
2360        /* correct session? */
2361        if (req->r_session != session) {
2362                pr_err("mdsc_handle_reply got %llu on session mds%d"
2363                       " not mds%d\n", tid, session->s_mds,
2364                       req->r_session ? req->r_session->s_mds : -1);
2365                mutex_unlock(&mdsc->mutex);
2366                goto out;
2367        }
2368
2369        /* dup? */
2370        if ((req->r_got_unsafe && !head->safe) ||
2371            (req->r_got_safe && head->safe)) {
2372                pr_warn("got a dup %s reply on %llu from mds%d\n",
2373                           head->safe ? "safe" : "unsafe", tid, mds);
2374                mutex_unlock(&mdsc->mutex);
2375                goto out;
2376        }
2377        if (req->r_got_safe) {
2378                pr_warn("got unsafe after safe on %llu from mds%d\n",
2379                           tid, mds);
2380                mutex_unlock(&mdsc->mutex);
2381                goto out;
2382        }
2383
2384        result = le32_to_cpu(head->result);
2385
2386        /*
2387         * Handle an ESTALE
2388         * if we're not talking to the authority, send to them
2389         * if the authority has changed while we weren't looking,
2390         * send to new authority
2391         * Otherwise we just have to return an ESTALE
2392         */
2393        if (result == -ESTALE) {
2394                dout("got ESTALE on request %llu", req->r_tid);
2395                req->r_resend_mds = -1;
2396                if (req->r_direct_mode != USE_AUTH_MDS) {
2397                        dout("not using auth, setting for that now");
2398                        req->r_direct_mode = USE_AUTH_MDS;
2399                        __do_request(mdsc, req);
2400                        mutex_unlock(&mdsc->mutex);
2401                        goto out;
2402                } else  {
2403                        int mds = __choose_mds(mdsc, req);
2404                        if (mds >= 0 && mds != req->r_session->s_mds) {
2405                                dout("but auth changed, so resending");
2406                                __do_request(mdsc, req);
2407                                mutex_unlock(&mdsc->mutex);
2408                                goto out;
2409                        }
2410                }
2411                dout("have to return ESTALE on request %llu", req->r_tid);
2412        }
2413
2414
2415        if (head->safe) {
2416                req->r_got_safe = true;
2417                __unregister_request(mdsc, req);
2418
2419                if (req->r_got_unsafe) {
2420                        /*
2421                         * We already handled the unsafe response, now do the
2422                         * cleanup.  No need to examine the response; the MDS
2423                         * doesn't include any result info in the safe
2424                         * response.  And even if it did, there is nothing
2425                         * useful we could do with a revised return value.
2426                         */
2427                        dout("got safe reply %llu, mds%d\n", tid, mds);
2428
2429                        /* last unsafe request during umount? */
2430                        if (mdsc->stopping && !__get_oldest_req(mdsc))
2431                                complete_all(&mdsc->safe_umount_waiters);
2432                        mutex_unlock(&mdsc->mutex);
2433                        goto out;
2434                }
2435        } else {
2436                req->r_got_unsafe = true;
2437                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2438                if (req->r_unsafe_dir) {
2439                        struct ceph_inode_info *ci =
2440                                        ceph_inode(req->r_unsafe_dir);
2441                        spin_lock(&ci->i_unsafe_lock);
2442                        list_add_tail(&req->r_unsafe_dir_item,
2443                                      &ci->i_unsafe_dirops);
2444                        spin_unlock(&ci->i_unsafe_lock);
2445                }
2446        }
2447
2448        dout("handle_reply tid %lld result %d\n", tid, result);
2449        rinfo = &req->r_reply_info;
2450        err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2451        mutex_unlock(&mdsc->mutex);
2452
2453        mutex_lock(&session->s_mutex);
2454        if (err < 0) {
2455                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2456                ceph_msg_dump(msg);
2457                goto out_err;
2458        }
2459
2460        /* snap trace */
2461        realm = NULL;
2462        if (rinfo->snapblob_len) {
2463                down_write(&mdsc->snap_rwsem);
2464                ceph_update_snap_trace(mdsc, rinfo->snapblob,
2465                                rinfo->snapblob + rinfo->snapblob_len,
2466                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2467                                &realm);
2468                downgrade_write(&mdsc->snap_rwsem);
2469        } else {
2470                down_read(&mdsc->snap_rwsem);
2471        }
2472
2473        /* insert trace into our cache */
2474        mutex_lock(&req->r_fill_mutex);
2475        current->journal_info = req;
2476        err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2477        if (err == 0) {
2478                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2479                                    req->r_op == CEPH_MDS_OP_LSSNAP))
2480                        ceph_readdir_prepopulate(req, req->r_session);
2481                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2482        }
2483        current->journal_info = NULL;
2484        mutex_unlock(&req->r_fill_mutex);
2485
2486        up_read(&mdsc->snap_rwsem);
2487        if (realm)
2488                ceph_put_snap_realm(mdsc, realm);
2489
2490        if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
2491                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2492                spin_lock(&ci->i_unsafe_lock);
2493                list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2494                spin_unlock(&ci->i_unsafe_lock);
2495        }
2496out_err:
2497        mutex_lock(&mdsc->mutex);
2498        if (!req->r_aborted) {
2499                if (err) {
2500                        req->r_err = err;
2501                } else {
2502                        req->r_reply =  ceph_msg_get(msg);
2503                        req->r_got_result = true;
2504                }
2505        } else {
2506                dout("reply arrived after request %lld was aborted\n", tid);
2507        }
2508        mutex_unlock(&mdsc->mutex);
2509
2510        mutex_unlock(&session->s_mutex);
2511
2512        /* kick calling process */
2513        complete_request(mdsc, req);
2514out:
2515        ceph_mdsc_put_request(req);
2516        return;
2517}
2518
2519
2520
2521/*
2522 * handle mds notification that our request has been forwarded.
2523 */
2524static void handle_forward(struct ceph_mds_client *mdsc,
2525                           struct ceph_mds_session *session,
2526                           struct ceph_msg *msg)
2527{
2528        struct ceph_mds_request *req;
2529        u64 tid = le64_to_cpu(msg->hdr.tid);
2530        u32 next_mds;
2531        u32 fwd_seq;
2532        int err = -EINVAL;
2533        void *p = msg->front.iov_base;
2534        void *end = p + msg->front.iov_len;
2535
2536        ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2537        next_mds = ceph_decode_32(&p);
2538        fwd_seq = ceph_decode_32(&p);
2539
2540        mutex_lock(&mdsc->mutex);
2541        req = lookup_get_request(mdsc, tid);
2542        if (!req) {
2543                dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2544                goto out;  /* dup reply? */
2545        }
2546
2547        if (req->r_aborted) {
2548                dout("forward tid %llu aborted, unregistering\n", tid);
2549                __unregister_request(mdsc, req);
2550        } else if (fwd_seq <= req->r_num_fwd) {
2551                dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2552                     tid, next_mds, req->r_num_fwd, fwd_seq);
2553        } else {
2554                /* resend. forward race not possible; mds would drop */
2555                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2556                BUG_ON(req->r_err);
2557                BUG_ON(req->r_got_result);
2558                req->r_attempts = 0;
2559                req->r_num_fwd = fwd_seq;
2560                req->r_resend_mds = next_mds;
2561                put_request_session(req);
2562                __do_request(mdsc, req);
2563        }
2564        ceph_mdsc_put_request(req);
2565out:
2566        mutex_unlock(&mdsc->mutex);
2567        return;
2568
2569bad:
2570        pr_err("mdsc_handle_forward decode error err=%d\n", err);
2571}
2572
2573/*
2574 * handle a mds session control message
2575 */
2576static void handle_session(struct ceph_mds_session *session,
2577                           struct ceph_msg *msg)
2578{
2579        struct ceph_mds_client *mdsc = session->s_mdsc;
2580        u32 op;
2581        u64 seq;
2582        int mds = session->s_mds;
2583        struct ceph_mds_session_head *h = msg->front.iov_base;
2584        int wake = 0;
2585
2586        /* decode */
2587        if (msg->front.iov_len != sizeof(*h))
2588                goto bad;
2589        op = le32_to_cpu(h->op);
2590        seq = le64_to_cpu(h->seq);
2591
2592        mutex_lock(&mdsc->mutex);
2593        if (op == CEPH_SESSION_CLOSE)
2594                __unregister_session(mdsc, session);
2595        /* FIXME: this ttl calculation is generous */
2596        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2597        mutex_unlock(&mdsc->mutex);
2598
2599        mutex_lock(&session->s_mutex);
2600
2601        dout("handle_session mds%d %s %p state %s seq %llu\n",
2602             mds, ceph_session_op_name(op), session,
2603             ceph_session_state_name(session->s_state), seq);
2604
2605        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2606                session->s_state = CEPH_MDS_SESSION_OPEN;
2607                pr_info("mds%d came back\n", session->s_mds);
2608        }
2609
2610        switch (op) {
2611        case CEPH_SESSION_OPEN:
2612                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2613                        pr_info("mds%d reconnect success\n", session->s_mds);
2614                session->s_state = CEPH_MDS_SESSION_OPEN;
2615                renewed_caps(mdsc, session, 0);
2616                wake = 1;
2617                if (mdsc->stopping)
2618                        __close_session(mdsc, session);
2619                break;
2620
2621        case CEPH_SESSION_RENEWCAPS:
2622                if (session->s_renew_seq == seq)
2623                        renewed_caps(mdsc, session, 1);
2624                break;
2625
2626        case CEPH_SESSION_CLOSE:
2627                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2628                        pr_info("mds%d reconnect denied\n", session->s_mds);
2629                cleanup_session_requests(mdsc, session);
2630                remove_session_caps(session);
2631                wake = 2; /* for good measure */
2632                wake_up_all(&mdsc->session_close_wq);
2633                break;
2634
2635        case CEPH_SESSION_STALE:
2636                pr_info("mds%d caps went stale, renewing\n",
2637                        session->s_mds);
2638                spin_lock(&session->s_gen_ttl_lock);
2639                session->s_cap_gen++;
2640                session->s_cap_ttl = jiffies - 1;
2641                spin_unlock(&session->s_gen_ttl_lock);
2642                send_renew_caps(mdsc, session);
2643                break;
2644
2645        case CEPH_SESSION_RECALL_STATE:
2646                trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2647                break;
2648
2649        case CEPH_SESSION_FLUSHMSG:
2650                send_flushmsg_ack(mdsc, session, seq);
2651                break;
2652
2653        case CEPH_SESSION_FORCE_RO:
2654                dout("force_session_readonly %p\n", session);
2655                spin_lock(&session->s_cap_lock);
2656                session->s_readonly = true;
2657                spin_unlock(&session->s_cap_lock);
2658                wake_up_session_caps(session, 0);
2659                break;
2660
2661        case CEPH_SESSION_REJECT:
2662                WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
2663                pr_info("mds%d rejected session\n", session->s_mds);
2664                session->s_state = CEPH_MDS_SESSION_REJECTED;
2665                cleanup_session_requests(mdsc, session);
2666                remove_session_caps(session);
2667                wake = 2; /* for good measure */
2668                break;
2669
2670        default:
2671                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2672                WARN_ON(1);
2673        }
2674
2675        mutex_unlock(&session->s_mutex);
2676        if (wake) {
2677                mutex_lock(&mdsc->mutex);
2678                __wake_requests(mdsc, &session->s_waiting);
2679                if (wake == 2)
2680                        kick_requests(mdsc, mds);
2681                mutex_unlock(&mdsc->mutex);
2682        }
2683        return;
2684
2685bad:
2686        pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2687               (int)msg->front.iov_len);
2688        ceph_msg_dump(msg);
2689        return;
2690}
2691
2692
2693/*
2694 * called under session->mutex.
2695 */
2696static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2697                                   struct ceph_mds_session *session)
2698{
2699        struct ceph_mds_request *req, *nreq;
2700        struct rb_node *p;
2701        int err;
2702
2703        dout("replay_unsafe_requests mds%d\n", session->s_mds);
2704
2705        mutex_lock(&mdsc->mutex);
2706        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2707                err = __prepare_send_request(mdsc, req, session->s_mds, true);
2708                if (!err) {
2709                        ceph_msg_get(req->r_request);
2710                        ceph_con_send(&session->s_con, req->r_request);
2711                }
2712        }
2713
2714        /*
2715         * also re-send old requests when MDS enters reconnect stage. So that MDS
2716         * can process completed request in clientreplay stage.
2717         */
2718        p = rb_first(&mdsc->request_tree);
2719        while (p) {
2720                req = rb_entry(p, struct ceph_mds_request, r_node);
2721                p = rb_next(p);
2722                if (req->r_got_unsafe)
2723                        continue;
2724                if (req->r_attempts == 0)
2725                        continue; /* only old requests */
2726                if (req->r_session &&
2727                    req->r_session->s_mds == session->s_mds) {
2728                        err = __prepare_send_request(mdsc, req,
2729                                                     session->s_mds, true);
2730                        if (!err) {
2731                                ceph_msg_get(req->r_request);
2732                                ceph_con_send(&session->s_con, req->r_request);
2733                        }
2734                }
2735        }
2736        mutex_unlock(&mdsc->mutex);
2737}
2738
2739/*
2740 * Encode information about a cap for a reconnect with the MDS.
2741 */
2742static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2743                          void *arg)
2744{
2745        union {
2746                struct ceph_mds_cap_reconnect v2;
2747                struct ceph_mds_cap_reconnect_v1 v1;
2748        } rec;
2749        struct ceph_inode_info *ci;
2750        struct ceph_reconnect_state *recon_state = arg;
2751        struct ceph_pagelist *pagelist = recon_state->pagelist;
2752        char *path;
2753        int pathlen, err;
2754        u64 pathbase;
2755        u64 snap_follows;
2756        struct dentry *dentry;
2757
2758        ci = cap->ci;
2759
2760        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2761             inode, ceph_vinop(inode), cap, cap->cap_id,
2762             ceph_cap_string(cap->issued));
2763        err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2764        if (err)
2765                return err;
2766
2767        dentry = d_find_alias(inode);
2768        if (dentry) {
2769                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2770                if (IS_ERR(path)) {
2771                        err = PTR_ERR(path);
2772                        goto out_dput;
2773                }
2774        } else {
2775                path = NULL;
2776                pathlen = 0;
2777                pathbase = 0;
2778        }
2779
2780        spin_lock(&ci->i_ceph_lock);
2781        cap->seq = 0;        /* reset cap seq */
2782        cap->issue_seq = 0;  /* and issue_seq */
2783        cap->mseq = 0;       /* and migrate_seq */
2784        cap->cap_gen = cap->session->s_cap_gen;
2785
2786        if (recon_state->msg_version >= 2) {
2787                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2788                rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2789                rec.v2.issued = cpu_to_le32(cap->issued);
2790                rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2791                rec.v2.pathbase = cpu_to_le64(pathbase);
2792                rec.v2.flock_len = 0;
2793        } else {
2794                rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2795                rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2796                rec.v1.issued = cpu_to_le32(cap->issued);
2797                rec.v1.size = cpu_to_le64(inode->i_size);
2798                ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2799                ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2800                rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2801                rec.v1.pathbase = cpu_to_le64(pathbase);
2802        }
2803
2804        if (list_empty(&ci->i_cap_snaps)) {
2805                snap_follows = 0;
2806        } else {
2807                struct ceph_cap_snap *capsnap =
2808                        list_first_entry(&ci->i_cap_snaps,
2809                                         struct ceph_cap_snap, ci_item);
2810                snap_follows = capsnap->follows;
2811        }
2812        spin_unlock(&ci->i_ceph_lock);
2813
2814        if (recon_state->msg_version >= 2) {
2815                int num_fcntl_locks, num_flock_locks;
2816                struct ceph_filelock *flocks;
2817                size_t struct_len, total_len = 0;
2818                u8 struct_v = 0;
2819
2820encode_again:
2821                ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2822                flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2823                                 sizeof(struct ceph_filelock), GFP_NOFS);
2824                if (!flocks) {
2825                        err = -ENOMEM;
2826                        goto out_free;
2827                }
2828                err = ceph_encode_locks_to_buffer(inode, flocks,
2829                                                  num_fcntl_locks,
2830                                                  num_flock_locks);
2831                if (err) {
2832                        kfree(flocks);
2833                        if (err == -ENOSPC)
2834                                goto encode_again;
2835                        goto out_free;
2836                }
2837
2838                if (recon_state->msg_version >= 3) {
2839                        /* version, compat_version and struct_len */
2840                        total_len = 2 * sizeof(u8) + sizeof(u32);
2841                        struct_v = 2;
2842                }
2843                /*
2844                 * number of encoded locks is stable, so copy to pagelist
2845                 */
2846                struct_len = 2 * sizeof(u32) +
2847                            (num_fcntl_locks + num_flock_locks) *
2848                            sizeof(struct ceph_filelock);
2849                rec.v2.flock_len = cpu_to_le32(struct_len);
2850
2851                struct_len += sizeof(rec.v2);
2852                struct_len += sizeof(u32) + pathlen;
2853
2854                if (struct_v >= 2)
2855                        struct_len += sizeof(u64); /* snap_follows */
2856
2857                total_len += struct_len;
2858                err = ceph_pagelist_reserve(pagelist, total_len);
2859
2860                if (!err) {
2861                        if (recon_state->msg_version >= 3) {
2862                                ceph_pagelist_encode_8(pagelist, struct_v);
2863                                ceph_pagelist_encode_8(pagelist, 1);
2864                                ceph_pagelist_encode_32(pagelist, struct_len);
2865                        }
2866                        ceph_pagelist_encode_string(pagelist, path, pathlen);
2867                        ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
2868                        ceph_locks_to_pagelist(flocks, pagelist,
2869                                               num_fcntl_locks,
2870                                               num_flock_locks);
2871                        if (struct_v >= 2)
2872                                ceph_pagelist_encode_64(pagelist, snap_follows);
2873                }
2874                kfree(flocks);
2875        } else {
2876                size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
2877                err = ceph_pagelist_reserve(pagelist, size);
2878                if (!err) {
2879                        ceph_pagelist_encode_string(pagelist, path, pathlen);
2880                        ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
2881                }
2882        }
2883
2884        recon_state->nr_caps++;
2885out_free:
2886        kfree(path);
2887out_dput:
2888        dput(dentry);
2889        return err;
2890}
2891
2892
2893/*
2894 * If an MDS fails and recovers, clients need to reconnect in order to
2895 * reestablish shared state.  This includes all caps issued through
2896 * this session _and_ the snap_realm hierarchy.  Because it's not
2897 * clear which snap realms the mds cares about, we send everything we
2898 * know about.. that ensures we'll then get any new info the
2899 * recovering MDS might have.
2900 *
2901 * This is a relatively heavyweight operation, but it's rare.
2902 *
2903 * called with mdsc->mutex held.
2904 */
2905static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2906                               struct ceph_mds_session *session)
2907{
2908        struct ceph_msg *reply;
2909        struct rb_node *p;
2910        int mds = session->s_mds;
2911        int err = -ENOMEM;
2912        int s_nr_caps;
2913        struct ceph_pagelist *pagelist;
2914        struct ceph_reconnect_state recon_state;
2915
2916        pr_info("mds%d reconnect start\n", mds);
2917
2918        pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2919        if (!pagelist)
2920                goto fail_nopagelist;
2921        ceph_pagelist_init(pagelist);
2922
2923        reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2924        if (!reply)
2925                goto fail_nomsg;
2926
2927        mutex_lock(&session->s_mutex);
2928        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2929        session->s_seq = 0;
2930
2931        dout("session %p state %s\n", session,
2932             ceph_session_state_name(session->s_state));
2933
2934        spin_lock(&session->s_gen_ttl_lock);
2935        session->s_cap_gen++;
2936        spin_unlock(&session->s_gen_ttl_lock);
2937
2938        spin_lock(&session->s_cap_lock);
2939        /* don't know if session is readonly */
2940        session->s_readonly = 0;
2941        /*
2942         * notify __ceph_remove_cap() that we are composing cap reconnect.
2943         * If a cap get released before being added to the cap reconnect,
2944         * __ceph_remove_cap() should skip queuing cap release.
2945         */
2946        session->s_cap_reconnect = 1;
2947        /* drop old cap expires; we're about to reestablish that state */
2948        cleanup_cap_releases(mdsc, session);
2949
2950        /* trim unused caps to reduce MDS's cache rejoin time */
2951        if (mdsc->fsc->sb->s_root)
2952                shrink_dcache_parent(mdsc->fsc->sb->s_root);
2953
2954        ceph_con_close(&session->s_con);
2955        ceph_con_open(&session->s_con,
2956                      CEPH_ENTITY_TYPE_MDS, mds,
2957                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2958
2959        /* replay unsafe requests */
2960        replay_unsafe_requests(mdsc, session);
2961
2962        down_read(&mdsc->snap_rwsem);
2963
2964        /* traverse this session's caps */
2965        s_nr_caps = session->s_nr_caps;
2966        err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
2967        if (err)
2968                goto fail;
2969
2970        recon_state.nr_caps = 0;
2971        recon_state.pagelist = pagelist;
2972        if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
2973                recon_state.msg_version = 3;
2974        else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
2975                recon_state.msg_version = 2;
2976        else
2977                recon_state.msg_version = 1;
2978        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2979        if (err < 0)
2980                goto fail;
2981
2982        spin_lock(&session->s_cap_lock);
2983        session->s_cap_reconnect = 0;
2984        spin_unlock(&session->s_cap_lock);
2985
2986        /*
2987         * snaprealms.  we provide mds with the ino, seq (version), and
2988         * parent for all of our realms.  If the mds has any newer info,
2989         * it will tell us.
2990         */
2991        for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2992                struct ceph_snap_realm *realm =
2993                        rb_entry(p, struct ceph_snap_realm, node);
2994                struct ceph_mds_snaprealm_reconnect sr_rec;
2995
2996                dout(" adding snap realm %llx seq %lld parent %llx\n",
2997                     realm->ino, realm->seq, realm->parent_ino);
2998                sr_rec.ino = cpu_to_le64(realm->ino);
2999                sr_rec.seq = cpu_to_le64(realm->seq);
3000                sr_rec.parent = cpu_to_le64(realm->parent_ino);
3001                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3002                if (err)
3003                        goto fail;
3004        }
3005
3006        reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3007
3008        /* raced with cap release? */
3009        if (s_nr_caps != recon_state.nr_caps) {
3010                struct page *page = list_first_entry(&pagelist->head,
3011                                                     struct page, lru);
3012                __le32 *addr = kmap_atomic(page);
3013                *addr = cpu_to_le32(recon_state.nr_caps);
3014                kunmap_atomic(addr);
3015        }
3016
3017        reply->hdr.data_len = cpu_to_le32(pagelist->length);
3018        ceph_msg_data_add_pagelist(reply, pagelist);
3019
3020        ceph_early_kick_flushing_caps(mdsc, session);
3021
3022        ceph_con_send(&session->s_con, reply);
3023
3024        mutex_unlock(&session->s_mutex);
3025
3026        mutex_lock(&mdsc->mutex);
3027        __wake_requests(mdsc, &session->s_waiting);
3028        mutex_unlock(&mdsc->mutex);
3029
3030        up_read(&mdsc->snap_rwsem);
3031        return;
3032
3033fail:
3034        ceph_msg_put(reply);
3035        up_read(&mdsc->snap_rwsem);
3036        mutex_unlock(&session->s_mutex);
3037fail_nomsg:
3038        ceph_pagelist_release(pagelist);
3039fail_nopagelist:
3040        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3041        return;
3042}
3043
3044
3045/*
3046 * compare old and new mdsmaps, kicking requests
3047 * and closing out old connections as necessary
3048 *
3049 * called under mdsc->mutex.
3050 */
3051static void check_new_map(struct ceph_mds_client *mdsc,
3052                          struct ceph_mdsmap *newmap,
3053                          struct ceph_mdsmap *oldmap)
3054{
3055        int i;
3056        int oldstate, newstate;
3057        struct ceph_mds_session *s;
3058
3059        dout("check_new_map new %u old %u\n",
3060             newmap->m_epoch, oldmap->m_epoch);
3061
3062        for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
3063                if (mdsc->sessions[i] == NULL)
3064                        continue;
3065                s = mdsc->sessions[i];
3066                oldstate = ceph_mdsmap_get_state(oldmap, i);
3067                newstate = ceph_mdsmap_get_state(newmap, i);
3068
3069                dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3070                     i, ceph_mds_state_name(oldstate),
3071                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3072                     ceph_mds_state_name(newstate),
3073                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3074                     ceph_session_state_name(s->s_state));
3075
3076                if (i >= newmap->m_max_mds ||
3077                    memcmp(ceph_mdsmap_get_addr(oldmap, i),
3078                           ceph_mdsmap_get_addr(newmap, i),
3079                           sizeof(struct ceph_entity_addr))) {
3080                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3081                                /* the session never opened, just close it
3082                                 * out now */
3083                                __wake_requests(mdsc, &s->s_waiting);
3084                                __unregister_session(mdsc, s);
3085                        } else {
3086                                /* just close it */
3087                                mutex_unlock(&mdsc->mutex);
3088                                mutex_lock(&s->s_mutex);
3089                                mutex_lock(&mdsc->mutex);
3090                                ceph_con_close(&s->s_con);
3091                                mutex_unlock(&s->s_mutex);
3092                                s->s_state = CEPH_MDS_SESSION_RESTARTING;
3093                        }
3094                } else if (oldstate == newstate) {
3095                        continue;  /* nothing new with this mds */
3096                }
3097
3098                /*
3099                 * send reconnect?
3100                 */
3101                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3102                    newstate >= CEPH_MDS_STATE_RECONNECT) {
3103                        mutex_unlock(&mdsc->mutex);
3104                        send_mds_reconnect(mdsc, s);
3105                        mutex_lock(&mdsc->mutex);
3106                }
3107
3108                /*
3109                 * kick request on any mds that has gone active.
3110                 */
3111                if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3112                    newstate >= CEPH_MDS_STATE_ACTIVE) {
3113                        if (oldstate != CEPH_MDS_STATE_CREATING &&
3114                            oldstate != CEPH_MDS_STATE_STARTING)
3115                                pr_info("mds%d recovery completed\n", s->s_mds);
3116                        kick_requests(mdsc, i);
3117                        ceph_kick_flushing_caps(mdsc, s);
3118                        wake_up_session_caps(s, 1);
3119                }
3120        }
3121
3122        for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
3123                s = mdsc->sessions[i];
3124                if (!s)
3125                        continue;
3126                if (!ceph_mdsmap_is_laggy(newmap, i))
3127                        continue;
3128                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3129                    s->s_state == CEPH_MDS_SESSION_HUNG ||
3130                    s->s_state == CEPH_MDS_SESSION_CLOSING) {
3131                        dout(" connecting to export targets of laggy mds%d\n",
3132                             i);
3133                        __open_export_target_sessions(mdsc, s);
3134                }
3135        }
3136}
3137
3138
3139
3140/*
3141 * leases
3142 */
3143
3144/*
3145 * caller must hold session s_mutex, dentry->d_lock
3146 */
3147void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3148{
3149        struct ceph_dentry_info *di = ceph_dentry(dentry);
3150
3151        ceph_put_mds_session(di->lease_session);
3152        di->lease_session = NULL;
3153}
3154
3155static void handle_lease(struct ceph_mds_client *mdsc,
3156                         struct ceph_mds_session *session,
3157                         struct ceph_msg *msg)
3158{
3159        struct super_block *sb = mdsc->fsc->sb;
3160        struct inode *inode;
3161        struct dentry *parent, *dentry;
3162        struct ceph_dentry_info *di;
3163        int mds = session->s_mds;
3164        struct ceph_mds_lease *h = msg->front.iov_base;
3165        u32 seq;
3166        struct ceph_vino vino;
3167        struct qstr dname;
3168        int release = 0;
3169
3170        dout("handle_lease from mds%d\n", mds);
3171
3172        /* decode */
3173        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3174                goto bad;
3175        vino.ino = le64_to_cpu(h->ino);
3176        vino.snap = CEPH_NOSNAP;
3177        seq = le32_to_cpu(h->seq);
3178        dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3179        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3180        if (dname.len != get_unaligned_le32(h+1))
3181                goto bad;
3182
3183        /* lookup inode */
3184        inode = ceph_find_inode(sb, vino);
3185        dout("handle_lease %s, ino %llx %p %.*s\n",
3186             ceph_lease_op_name(h->action), vino.ino, inode,
3187             dname.len, dname.name);
3188
3189        mutex_lock(&session->s_mutex);
3190        session->s_seq++;
3191
3192        if (inode == NULL) {
3193                dout("handle_lease no inode %llx\n", vino.ino);
3194                goto release;
3195        }
3196
3197        /* dentry */
3198        parent = d_find_alias(inode);
3199        if (!parent) {
3200                dout("no parent dentry on inode %p\n", inode);
3201                WARN_ON(1);
3202                goto release;  /* hrm... */
3203        }
3204        dname.hash = full_name_hash(parent, dname.name, dname.len);
3205        dentry = d_lookup(parent, &dname);
3206        dput(parent);
3207        if (!dentry)
3208                goto release;
3209
3210        spin_lock(&dentry->d_lock);
3211        di = ceph_dentry(dentry);
3212        switch (h->action) {
3213        case CEPH_MDS_LEASE_REVOKE:
3214                if (di->lease_session == session) {
3215                        if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3216                                h->seq = cpu_to_le32(di->lease_seq);
3217                        __ceph_mdsc_drop_dentry_lease(dentry);
3218                }
3219                release = 1;
3220                break;
3221
3222        case CEPH_MDS_LEASE_RENEW:
3223                if (di->lease_session == session &&
3224                    di->lease_gen == session->s_cap_gen &&
3225                    di->lease_renew_from &&
3226                    di->lease_renew_after == 0) {
3227                        unsigned long duration =
3228                                msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3229
3230                        di->lease_seq = seq;
3231                        di->time = di->lease_renew_from + duration;
3232                        di->lease_renew_after = di->lease_renew_from +
3233                                (duration >> 1);
3234                        di->lease_renew_from = 0;
3235                }
3236                break;
3237        }
3238        spin_unlock(&dentry->d_lock);
3239        dput(dentry);
3240
3241        if (!release)
3242                goto out;
3243
3244release:
3245        /* let's just reuse the same message */
3246        h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3247        ceph_msg_get(msg);
3248        ceph_con_send(&session->s_con, msg);
3249
3250out:
3251        iput(inode);
3252        mutex_unlock(&session->s_mutex);
3253        return;
3254
3255bad:
3256        pr_err("corrupt lease message\n");
3257        ceph_msg_dump(msg);
3258}
3259
3260void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3261                              struct inode *inode,
3262                              struct dentry *dentry, char action,
3263                              u32 seq)
3264{
3265        struct ceph_msg *msg;
3266        struct ceph_mds_lease *lease;
3267        int len = sizeof(*lease) + sizeof(u32);
3268        int dnamelen = 0;
3269
3270        dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3271             inode, dentry, ceph_lease_op_name(action), session->s_mds);
3272        dnamelen = dentry->d_name.len;
3273        len += dnamelen;
3274
3275        msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3276        if (!msg)
3277                return;
3278        lease = msg->front.iov_base;
3279        lease->action = action;
3280        lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3281        lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3282        lease->seq = cpu_to_le32(seq);
3283        put_unaligned_le32(dnamelen, lease + 1);
3284        memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3285
3286        /*
3287         * if this is a preemptive lease RELEASE, no need to
3288         * flush request stream, since the actual request will
3289         * soon follow.
3290         */
3291        msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3292
3293        ceph_con_send(&session->s_con, msg);
3294}
3295
3296/*
3297 * drop all leases (and dentry refs) in preparation for umount
3298 */
3299static void drop_leases(struct ceph_mds_client *mdsc)
3300{
3301        int i;
3302
3303        dout("drop_leases\n");
3304        mutex_lock(&mdsc->mutex);
3305        for (i = 0; i < mdsc->max_sessions; i++) {
3306                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3307                if (!s)
3308                        continue;
3309                mutex_unlock(&mdsc->mutex);
3310                mutex_lock(&s->s_mutex);
3311                mutex_unlock(&s->s_mutex);
3312                ceph_put_mds_session(s);
3313                mutex_lock(&mdsc->mutex);
3314        }
3315        mutex_unlock(&mdsc->mutex);
3316}
3317
3318
3319
3320/*
3321 * delayed work -- periodically trim expired leases, renew caps with mds
3322 */
3323static void schedule_delayed(struct ceph_mds_client *mdsc)
3324{
3325        int delay = 5;
3326        unsigned hz = round_jiffies_relative(HZ * delay);
3327        schedule_delayed_work(&mdsc->delayed_work, hz);
3328}
3329
3330static void delayed_work(struct work_struct *work)
3331{
3332        int i;
3333        struct ceph_mds_client *mdsc =
3334                container_of(work, struct ceph_mds_client, delayed_work.work);
3335        int renew_interval;
3336        int renew_caps;
3337
3338        dout("mdsc delayed_work\n");
3339        ceph_check_delayed_caps(mdsc);
3340
3341        mutex_lock(&mdsc->mutex);
3342        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3343        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3344                                   mdsc->last_renew_caps);
3345        if (renew_caps)
3346                mdsc->last_renew_caps = jiffies;
3347
3348        for (i = 0; i < mdsc->max_sessions; i++) {
3349                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3350                if (s == NULL)
3351                        continue;
3352                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3353                        dout("resending session close request for mds%d\n",
3354                             s->s_mds);
3355                        request_close_session(mdsc, s);
3356                        ceph_put_mds_session(s);
3357                        continue;
3358                }
3359                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3360                        if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3361                                s->s_state = CEPH_MDS_SESSION_HUNG;
3362                                pr_info("mds%d hung\n", s->s_mds);
3363                        }
3364                }
3365                if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3366                        /* this mds is failed or recovering, just wait */
3367                        ceph_put_mds_session(s);
3368                        continue;
3369                }
3370                mutex_unlock(&mdsc->mutex);
3371
3372                mutex_lock(&s->s_mutex);
3373                if (renew_caps)
3374                        send_renew_caps(mdsc, s);
3375                else
3376                        ceph_con_keepalive(&s->s_con);
3377                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3378                    s->s_state == CEPH_MDS_SESSION_HUNG)
3379                        ceph_send_cap_releases(mdsc, s);
3380                mutex_unlock(&s->s_mutex);
3381                ceph_put_mds_session(s);
3382
3383                mutex_lock(&mdsc->mutex);
3384        }
3385        mutex_unlock(&mdsc->mutex);
3386
3387        schedule_delayed(mdsc);
3388}
3389
3390int ceph_mdsc_init(struct ceph_fs_client *fsc)
3391
3392{
3393        struct ceph_mds_client *mdsc;
3394
3395        mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3396        if (!mdsc)
3397                return -ENOMEM;
3398        mdsc->fsc = fsc;
3399        fsc->mdsc = mdsc;
3400        mutex_init(&mdsc->mutex);
3401        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3402        if (mdsc->mdsmap == NULL) {
3403                kfree(mdsc);
3404                return -ENOMEM;
3405        }
3406
3407        init_completion(&mdsc->safe_umount_waiters);
3408        init_waitqueue_head(&mdsc->session_close_wq);
3409        INIT_LIST_HEAD(&mdsc->waiting_for_map);
3410        mdsc->sessions = NULL;
3411        atomic_set(&mdsc->num_sessions, 0);
3412        mdsc->max_sessions = 0;
3413        mdsc->stopping = 0;
3414        mdsc->last_snap_seq = 0;
3415        init_rwsem(&mdsc->snap_rwsem);
3416        mdsc->snap_realms = RB_ROOT;
3417        INIT_LIST_HEAD(&mdsc->snap_empty);
3418        spin_lock_init(&mdsc->snap_empty_lock);
3419        mdsc->last_tid = 0;
3420        mdsc->oldest_tid = 0;
3421        mdsc->request_tree = RB_ROOT;
3422        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3423        mdsc->last_renew_caps = jiffies;
3424        INIT_LIST_HEAD(&mdsc->cap_delay_list);
3425        spin_lock_init(&mdsc->cap_delay_lock);
3426        INIT_LIST_HEAD(&mdsc->snap_flush_list);
3427        spin_lock_init(&mdsc->snap_flush_lock);
3428        mdsc->last_cap_flush_tid = 1;
3429        INIT_LIST_HEAD(&mdsc->cap_flush_list);
3430        INIT_LIST_HEAD(&mdsc->cap_dirty);
3431        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3432        mdsc->num_cap_flushing = 0;
3433        spin_lock_init(&mdsc->cap_dirty_lock);
3434        init_waitqueue_head(&mdsc->cap_flushing_wq);
3435        spin_lock_init(&mdsc->dentry_lru_lock);
3436        INIT_LIST_HEAD(&mdsc->dentry_lru);
3437
3438        ceph_caps_init(mdsc);
3439        ceph_adjust_min_caps(mdsc, fsc->min_caps);
3440
3441        init_rwsem(&mdsc->pool_perm_rwsem);
3442        mdsc->pool_perm_tree = RB_ROOT;
3443
3444        return 0;
3445}
3446
3447/*
3448 * Wait for safe replies on open mds requests.  If we time out, drop
3449 * all requests from the tree to avoid dangling dentry refs.
3450 */
3451static void wait_requests(struct ceph_mds_client *mdsc)
3452{
3453        struct ceph_options *opts = mdsc->fsc->client->options;
3454        struct ceph_mds_request *req;
3455
3456        mutex_lock(&mdsc->mutex);
3457        if (__get_oldest_req(mdsc)) {
3458                mutex_unlock(&mdsc->mutex);
3459
3460                dout("wait_requests waiting for requests\n");
3461                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3462                                    ceph_timeout_jiffies(opts->mount_timeout));
3463
3464                /* tear down remaining requests */
3465                mutex_lock(&mdsc->mutex);
3466                while ((req = __get_oldest_req(mdsc))) {
3467                        dout("wait_requests timed out on tid %llu\n",
3468                             req->r_tid);
3469                        __unregister_request(mdsc, req);
3470                }
3471        }
3472        mutex_unlock(&mdsc->mutex);
3473        dout("wait_requests done\n");
3474}
3475
3476/*
3477 * called before mount is ro, and before dentries are torn down.
3478 * (hmm, does this still race with new lookups?)
3479 */
3480void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3481{
3482        dout("pre_umount\n");
3483        mdsc->stopping = 1;
3484
3485        drop_leases(mdsc);
3486        ceph_flush_dirty_caps(mdsc);
3487        wait_requests(mdsc);
3488
3489        /*
3490         * wait for reply handlers to drop their request refs and
3491         * their inode/dcache refs
3492         */
3493        ceph_msgr_flush();
3494}
3495
3496/*
3497 * wait for all write mds requests to flush.
3498 */
3499static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3500{
3501        struct ceph_mds_request *req = NULL, *nextreq;
3502        struct rb_node *n;
3503
3504        mutex_lock(&mdsc->mutex);
3505        dout("wait_unsafe_requests want %lld\n", want_tid);
3506restart:
3507        req = __get_oldest_req(mdsc);
3508        while (req && req->r_tid <= want_tid) {
3509                /* find next request */
3510                n = rb_next(&req->r_node);
3511                if (n)
3512                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3513                else
3514                        nextreq = NULL;
3515                if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3516                    (req->r_op & CEPH_MDS_OP_WRITE)) {
3517                        /* write op */
3518                        ceph_mdsc_get_request(req);
3519                        if (nextreq)
3520                                ceph_mdsc_get_request(nextreq);
3521                        mutex_unlock(&mdsc->mutex);
3522                        dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3523                             req->r_tid, want_tid);
3524                        wait_for_completion(&req->r_safe_completion);
3525                        mutex_lock(&mdsc->mutex);
3526                        ceph_mdsc_put_request(req);
3527                        if (!nextreq)
3528                                break;  /* next dne before, so we're done! */
3529                        if (RB_EMPTY_NODE(&nextreq->r_node)) {
3530                                /* next request was removed from tree */
3531                                ceph_mdsc_put_request(nextreq);
3532                                goto restart;
3533                        }
3534                        ceph_mdsc_put_request(nextreq);  /* won't go away */
3535                }
3536                req = nextreq;
3537        }
3538        mutex_unlock(&mdsc->mutex);
3539        dout("wait_unsafe_requests done\n");
3540}
3541
3542void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3543{
3544        u64 want_tid, want_flush;
3545
3546        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3547                return;
3548
3549        dout("sync\n");
3550        mutex_lock(&mdsc->mutex);
3551        want_tid = mdsc->last_tid;
3552        mutex_unlock(&mdsc->mutex);
3553
3554        ceph_flush_dirty_caps(mdsc);
3555        spin_lock(&mdsc->cap_dirty_lock);
3556        want_flush = mdsc->last_cap_flush_tid;
3557        if (!list_empty(&mdsc->cap_flush_list)) {
3558                struct ceph_cap_flush *cf =
3559                        list_last_entry(&mdsc->cap_flush_list,
3560                                        struct ceph_cap_flush, g_list);
3561                cf->wake = true;
3562        }
3563        spin_unlock(&mdsc->cap_dirty_lock);
3564
3565        dout("sync want tid %lld flush_seq %lld\n",
3566             want_tid, want_flush);
3567
3568        wait_unsafe_requests(mdsc, want_tid);
3569        wait_caps_flush(mdsc, want_flush);
3570}
3571
3572/*
3573 * true if all sessions are closed, or we force unmount
3574 */
3575static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
3576{
3577        if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3578                return true;
3579        return atomic_read(&mdsc->num_sessions) <= skipped;
3580}
3581
3582/*
3583 * called after sb is ro.
3584 */
3585void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3586{
3587        struct ceph_options *opts = mdsc->fsc->client->options;
3588        struct ceph_mds_session *session;
3589        int i;
3590        int skipped = 0;
3591
3592        dout("close_sessions\n");
3593
3594        /* close sessions */
3595        mutex_lock(&mdsc->mutex);
3596        for (i = 0; i < mdsc->max_sessions; i++) {
3597                session = __ceph_lookup_mds_session(mdsc, i);
3598                if (!session)
3599                        continue;
3600                mutex_unlock(&mdsc->mutex);
3601                mutex_lock(&session->s_mutex);
3602                if (__close_session(mdsc, session) <= 0)
3603                        skipped++;
3604                mutex_unlock(&session->s_mutex);
3605                ceph_put_mds_session(session);
3606                mutex_lock(&mdsc->mutex);
3607        }
3608        mutex_unlock(&mdsc->mutex);
3609
3610        dout("waiting for sessions to close\n");
3611        wait_event_timeout(mdsc->session_close_wq,
3612                           done_closing_sessions(mdsc, skipped),
3613                           ceph_timeout_jiffies(opts->mount_timeout));
3614
3615        /* tear down remaining sessions */
3616        mutex_lock(&mdsc->mutex);
3617        for (i = 0; i < mdsc->max_sessions; i++) {
3618                if (mdsc->sessions[i]) {
3619                        session = get_session(mdsc->sessions[i]);
3620                        __unregister_session(mdsc, session);
3621                        mutex_unlock(&mdsc->mutex);
3622                        mutex_lock(&session->s_mutex);
3623                        remove_session_caps(session);
3624                        mutex_unlock(&session->s_mutex);
3625                        ceph_put_mds_session(session);
3626                        mutex_lock(&mdsc->mutex);
3627                }
3628        }
3629        WARN_ON(!list_empty(&mdsc->cap_delay_list));
3630        mutex_unlock(&mdsc->mutex);
3631
3632        ceph_cleanup_empty_realms(mdsc);
3633
3634        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3635
3636        dout("stopped\n");
3637}
3638
3639void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3640{
3641        struct ceph_mds_session *session;
3642        int mds;
3643
3644        dout("force umount\n");
3645
3646        mutex_lock(&mdsc->mutex);
3647        for (mds = 0; mds < mdsc->max_sessions; mds++) {
3648                session = __ceph_lookup_mds_session(mdsc, mds);
3649                if (!session)
3650                        continue;
3651                mutex_unlock(&mdsc->mutex);
3652                mutex_lock(&session->s_mutex);
3653                __close_session(mdsc, session);
3654                if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3655                        cleanup_session_requests(mdsc, session);
3656                        remove_session_caps(session);
3657                }
3658                mutex_unlock(&session->s_mutex);
3659                ceph_put_mds_session(session);
3660                mutex_lock(&mdsc->mutex);
3661                kick_requests(mdsc, mds);
3662        }
3663        __wake_requests(mdsc, &mdsc->waiting_for_map);
3664        mutex_unlock(&mdsc->mutex);
3665}
3666
3667static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3668{
3669        dout("stop\n");
3670        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3671        if (mdsc->mdsmap)
3672                ceph_mdsmap_destroy(mdsc->mdsmap);
3673        kfree(mdsc->sessions);
3674        ceph_caps_finalize(mdsc);
3675        ceph_pool_perm_destroy(mdsc);
3676}
3677
3678void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3679{
3680        struct ceph_mds_client *mdsc = fsc->mdsc;
3681
3682        dout("mdsc_destroy %p\n", mdsc);
3683        ceph_mdsc_stop(mdsc);
3684
3685        /* flush out any connection work with references to us */
3686        ceph_msgr_flush();
3687
3688        fsc->mdsc = NULL;
3689        kfree(mdsc);
3690        dout("mdsc_destroy %p done\n", mdsc);
3691}
3692
3693void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3694{
3695        struct ceph_fs_client *fsc = mdsc->fsc;
3696        const char *mds_namespace = fsc->mount_options->mds_namespace;
3697        void *p = msg->front.iov_base;
3698        void *end = p + msg->front.iov_len;
3699        u32 epoch;
3700        u32 map_len;
3701        u32 num_fs;
3702        u32 mount_fscid = (u32)-1;
3703        u8 struct_v, struct_cv;
3704        int err = -EINVAL;
3705
3706        ceph_decode_need(&p, end, sizeof(u32), bad);
3707        epoch = ceph_decode_32(&p);
3708
3709        dout("handle_fsmap epoch %u\n", epoch);
3710
3711        ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
3712        struct_v = ceph_decode_8(&p);
3713        struct_cv = ceph_decode_8(&p);
3714        map_len = ceph_decode_32(&p);
3715
3716        ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
3717        p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
3718
3719        num_fs = ceph_decode_32(&p);
3720        while (num_fs-- > 0) {
3721                void *info_p, *info_end;
3722                u32 info_len;
3723                u8 info_v, info_cv;
3724                u32 fscid, namelen;
3725
3726                ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
3727                info_v = ceph_decode_8(&p);
3728                info_cv = ceph_decode_8(&p);
3729                info_len = ceph_decode_32(&p);
3730                ceph_decode_need(&p, end, info_len, bad);
3731                info_p = p;
3732                info_end = p + info_len;
3733                p = info_end;
3734
3735                ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
3736                fscid = ceph_decode_32(&info_p);
3737                namelen = ceph_decode_32(&info_p);
3738                ceph_decode_need(&info_p, info_end, namelen, bad);
3739
3740                if (mds_namespace &&
3741                    strlen(mds_namespace) == namelen &&
3742                    !strncmp(mds_namespace, (char *)info_p, namelen)) {
3743                        mount_fscid = fscid;
3744                        break;
3745                }
3746        }
3747
3748        ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
3749        if (mount_fscid != (u32)-1) {
3750                fsc->client->monc.fs_cluster_id = mount_fscid;
3751                ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
3752                                   0, true);
3753                ceph_monc_renew_subs(&fsc->client->monc);
3754        } else {
3755                err = -ENOENT;
3756                goto err_out;
3757        }
3758        return;
3759bad:
3760        pr_err("error decoding fsmap\n");
3761err_out:
3762        mutex_lock(&mdsc->mutex);
3763        mdsc->mdsmap_err = -ENOENT;
3764        __wake_requests(mdsc, &mdsc->waiting_for_map);
3765        mutex_unlock(&mdsc->mutex);
3766        return;
3767}
3768
3769/*
3770 * handle mds map update.
3771 */
3772void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3773{
3774        u32 epoch;
3775        u32 maplen;
3776        void *p = msg->front.iov_base;
3777        void *end = p + msg->front.iov_len;
3778        struct ceph_mdsmap *newmap, *oldmap;
3779        struct ceph_fsid fsid;
3780        int err = -EINVAL;
3781
3782        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3783        ceph_decode_copy(&p, &fsid, sizeof(fsid));
3784        if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3785                return;
3786        epoch = ceph_decode_32(&p);
3787        maplen = ceph_decode_32(&p);
3788        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3789
3790        /* do we need it? */
3791        mutex_lock(&mdsc->mutex);
3792        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3793                dout("handle_map epoch %u <= our %u\n",
3794                     epoch, mdsc->mdsmap->m_epoch);
3795                mutex_unlock(&mdsc->mutex);
3796                return;
3797        }
3798
3799        newmap = ceph_mdsmap_decode(&p, end);
3800        if (IS_ERR(newmap)) {
3801                err = PTR_ERR(newmap);
3802                goto bad_unlock;
3803        }
3804
3805        /* swap into place */
3806        if (mdsc->mdsmap) {
3807                oldmap = mdsc->mdsmap;
3808                mdsc->mdsmap = newmap;
3809                check_new_map(mdsc, newmap, oldmap);
3810                ceph_mdsmap_destroy(oldmap);
3811        } else {
3812                mdsc->mdsmap = newmap;  /* first mds map */
3813        }
3814        mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3815
3816        __wake_requests(mdsc, &mdsc->waiting_for_map);
3817        ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
3818                          mdsc->mdsmap->m_epoch);
3819
3820        mutex_unlock(&mdsc->mutex);
3821        schedule_delayed(mdsc);
3822        return;
3823
3824bad_unlock:
3825        mutex_unlock(&mdsc->mutex);
3826bad:
3827        pr_err("error decoding mdsmap %d\n", err);
3828        return;
3829}
3830
3831static struct ceph_connection *con_get(struct ceph_connection *con)
3832{
3833        struct ceph_mds_session *s = con->private;
3834
3835        if (get_session(s)) {
3836                dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3837                return con;
3838        }
3839        dout("mdsc con_get %p FAIL\n", s);
3840        return NULL;
3841}
3842
3843static void con_put(struct ceph_connection *con)
3844{
3845        struct ceph_mds_session *s = con->private;
3846
3847        dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3848        ceph_put_mds_session(s);
3849}
3850
3851/*
3852 * if the client is unresponsive for long enough, the mds will kill
3853 * the session entirely.
3854 */
3855static void peer_reset(struct ceph_connection *con)
3856{
3857        struct ceph_mds_session *s = con->private;
3858        struct ceph_mds_client *mdsc = s->s_mdsc;
3859
3860        pr_warn("mds%d closed our session\n", s->s_mds);
3861        send_mds_reconnect(mdsc, s);
3862}
3863
3864static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3865{
3866        struct ceph_mds_session *s = con->private;
3867        struct ceph_mds_client *mdsc = s->s_mdsc;
3868        int type = le16_to_cpu(msg->hdr.type);
3869
3870        mutex_lock(&mdsc->mutex);
3871        if (__verify_registered_session(mdsc, s) < 0) {
3872                mutex_unlock(&mdsc->mutex);
3873                goto out;
3874        }
3875        mutex_unlock(&mdsc->mutex);
3876
3877        switch (type) {
3878        case CEPH_MSG_MDS_MAP:
3879                ceph_mdsc_handle_mdsmap(mdsc, msg);
3880                break;
3881        case CEPH_MSG_FS_MAP_USER:
3882                ceph_mdsc_handle_fsmap(mdsc, msg);
3883                break;
3884        case CEPH_MSG_CLIENT_SESSION:
3885                handle_session(s, msg);
3886                break;
3887        case CEPH_MSG_CLIENT_REPLY:
3888                handle_reply(s, msg);
3889                break;
3890        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3891                handle_forward(mdsc, s, msg);
3892                break;
3893        case CEPH_MSG_CLIENT_CAPS:
3894                ceph_handle_caps(s, msg);
3895                break;
3896        case CEPH_MSG_CLIENT_SNAP:
3897                ceph_handle_snap(mdsc, s, msg);
3898                break;
3899        case CEPH_MSG_CLIENT_LEASE:
3900                handle_lease(mdsc, s, msg);
3901                break;
3902
3903        default:
3904                pr_err("received unknown message type %d %s\n", type,
3905                       ceph_msg_type_name(type));
3906        }
3907out:
3908        ceph_msg_put(msg);
3909}
3910
3911/*
3912 * authentication
3913 */
3914
3915/*
3916 * Note: returned pointer is the address of a structure that's
3917 * managed separately.  Caller must *not* attempt to free it.
3918 */
3919static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3920                                        int *proto, int force_new)
3921{
3922        struct ceph_mds_session *s = con->private;
3923        struct ceph_mds_client *mdsc = s->s_mdsc;
3924        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3925        struct ceph_auth_handshake *auth = &s->s_auth;
3926
3927        if (force_new && auth->authorizer) {
3928                ceph_auth_destroy_authorizer(auth->authorizer);
3929                auth->authorizer = NULL;
3930        }
3931        if (!auth->authorizer) {
3932                int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3933                                                      auth);
3934                if (ret)
3935                        return ERR_PTR(ret);
3936        } else {
3937                int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3938                                                      auth);
3939                if (ret)
3940                        return ERR_PTR(ret);
3941        }
3942        *proto = ac->protocol;
3943
3944        return auth;
3945}
3946
3947
3948static int verify_authorizer_reply(struct ceph_connection *con, int len)
3949{
3950        struct ceph_mds_session *s = con->private;
3951        struct ceph_mds_client *mdsc = s->s_mdsc;
3952        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3953
3954        return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3955}
3956
3957static int invalidate_authorizer(struct ceph_connection *con)
3958{
3959        struct ceph_mds_session *s = con->private;
3960        struct ceph_mds_client *mdsc = s->s_mdsc;
3961        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3962
3963        ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3964
3965        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3966}
3967
3968static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3969                                struct ceph_msg_header *hdr, int *skip)
3970{
3971        struct ceph_msg *msg;
3972        int type = (int) le16_to_cpu(hdr->type);
3973        int front_len = (int) le32_to_cpu(hdr->front_len);
3974
3975        if (con->in_msg)
3976                return con->in_msg;
3977
3978        *skip = 0;
3979        msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
3980        if (!msg) {
3981                pr_err("unable to allocate msg type %d len %d\n",
3982                       type, front_len);
3983                return NULL;
3984        }
3985
3986        return msg;
3987}
3988
3989static int mds_sign_message(struct ceph_msg *msg)
3990{
3991       struct ceph_mds_session *s = msg->con->private;
3992       struct ceph_auth_handshake *auth = &s->s_auth;
3993
3994       return ceph_auth_sign_message(auth, msg);
3995}
3996
3997static int mds_check_message_signature(struct ceph_msg *msg)
3998{
3999       struct ceph_mds_session *s = msg->con->private;
4000       struct ceph_auth_handshake *auth = &s->s_auth;
4001
4002       return ceph_auth_check_message_signature(auth, msg);
4003}
4004
4005static const struct ceph_connection_operations mds_con_ops = {
4006        .get = con_get,
4007        .put = con_put,
4008        .dispatch = dispatch,
4009        .get_authorizer = get_authorizer,
4010        .verify_authorizer_reply = verify_authorizer_reply,
4011        .invalidate_authorizer = invalidate_authorizer,
4012        .peer_reset = peer_reset,
4013        .alloc_msg = mds_alloc_msg,
4014        .sign_message = mds_sign_message,
4015        .check_message_signature = mds_check_message_signature,
4016};
4017
4018/* eof */
Note: See TracBrowser for help on using the repository browser.