source: src/linux/universal/linux-4.9/net/sunrpc/svc.c @ 31859

Last change on this file since 31859 was 31859, checked in by brainslayer, 11 days ago

kernel update

File size: 33.9 KB
Line 
1/*
2 * linux/net/sunrpc/svc.c
3 *
4 * High-level RPC service routines
5 *
6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
7 *
8 * Multiple threads pools and NUMAisation
9 * Copyright (c) 2006 Silicon Graphics, Inc.
10 * by Greg Banks <gnb@melbourne.sgi.com>
11 */
12
13#include <linux/linkage.h>
14#include <linux/sched.h>
15#include <linux/errno.h>
16#include <linux/net.h>
17#include <linux/in.h>
18#include <linux/mm.h>
19#include <linux/interrupt.h>
20#include <linux/module.h>
21#include <linux/kthread.h>
22#include <linux/slab.h>
23
24#include <linux/sunrpc/types.h>
25#include <linux/sunrpc/xdr.h>
26#include <linux/sunrpc/stats.h>
27#include <linux/sunrpc/svcsock.h>
28#include <linux/sunrpc/clnt.h>
29#include <linux/sunrpc/bc_xprt.h>
30
31#include <trace/events/sunrpc.h>
32
33#define RPCDBG_FACILITY RPCDBG_SVCDSP
34
35static void svc_unregister(const struct svc_serv *serv, struct net *net);
36
37#define svc_serv_is_pooled(serv)    ((serv)->sv_ops->svo_function)
38
39#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
40
41/*
42 * Structure for mapping cpus to pools and vice versa.
43 * Setup once during sunrpc initialisation.
44 */
45struct svc_pool_map svc_pool_map = {
46        .mode = SVC_POOL_DEFAULT
47};
48EXPORT_SYMBOL_GPL(svc_pool_map);
49
50static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
51
52static int
53param_set_pool_mode(const char *val, struct kernel_param *kp)
54{
55        int *ip = (int *)kp->arg;
56        struct svc_pool_map *m = &svc_pool_map;
57        int err;
58
59        mutex_lock(&svc_pool_map_mutex);
60
61        err = -EBUSY;
62        if (m->count)
63                goto out;
64
65        err = 0;
66        if (!strncmp(val, "auto", 4))
67                *ip = SVC_POOL_AUTO;
68        else if (!strncmp(val, "global", 6))
69                *ip = SVC_POOL_GLOBAL;
70        else if (!strncmp(val, "percpu", 6))
71                *ip = SVC_POOL_PERCPU;
72        else if (!strncmp(val, "pernode", 7))
73                *ip = SVC_POOL_PERNODE;
74        else
75                err = -EINVAL;
76
77out:
78        mutex_unlock(&svc_pool_map_mutex);
79        return err;
80}
81
82static int
83param_get_pool_mode(char *buf, struct kernel_param *kp)
84{
85        int *ip = (int *)kp->arg;
86
87        switch (*ip)
88        {
89        case SVC_POOL_AUTO:
90                return strlcpy(buf, "auto", 20);
91        case SVC_POOL_GLOBAL:
92                return strlcpy(buf, "global", 20);
93        case SVC_POOL_PERCPU:
94                return strlcpy(buf, "percpu", 20);
95        case SVC_POOL_PERNODE:
96                return strlcpy(buf, "pernode", 20);
97        default:
98                return sprintf(buf, "%d", *ip);
99        }
100}
101
102module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
103                 &svc_pool_map.mode, 0644);
104
105/*
106 * Detect best pool mapping mode heuristically,
107 * according to the machine's topology.
108 */
109static int
110svc_pool_map_choose_mode(void)
111{
112        unsigned int node;
113
114        if (nr_online_nodes > 1) {
115                /*
116                 * Actually have multiple NUMA nodes,
117                 * so split pools on NUMA node boundaries
118                 */
119                return SVC_POOL_PERNODE;
120        }
121
122        node = first_online_node;
123        if (nr_cpus_node(node) > 2) {
124                /*
125                 * Non-trivial SMP, or CONFIG_NUMA on
126                 * non-NUMA hardware, e.g. with a generic
127                 * x86_64 kernel on Xeons.  In this case we
128                 * want to divide the pools on cpu boundaries.
129                 */
130                return SVC_POOL_PERCPU;
131        }
132
133        /* default: one global pool */
134        return SVC_POOL_GLOBAL;
135}
136
137/*
138 * Allocate the to_pool[] and pool_to[] arrays.
139 * Returns 0 on success or an errno.
140 */
141static int
142svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
143{
144        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
145        if (!m->to_pool)
146                goto fail;
147        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
148        if (!m->pool_to)
149                goto fail_free;
150
151        return 0;
152
153fail_free:
154        kfree(m->to_pool);
155        m->to_pool = NULL;
156fail:
157        return -ENOMEM;
158}
159
160/*
161 * Initialise the pool map for SVC_POOL_PERCPU mode.
162 * Returns number of pools or <0 on error.
163 */
164static int
165svc_pool_map_init_percpu(struct svc_pool_map *m)
166{
167        unsigned int maxpools = nr_cpu_ids;
168        unsigned int pidx = 0;
169        unsigned int cpu;
170        int err;
171
172        err = svc_pool_map_alloc_arrays(m, maxpools);
173        if (err)
174                return err;
175
176        for_each_online_cpu(cpu) {
177                BUG_ON(pidx >= maxpools);
178                m->to_pool[cpu] = pidx;
179                m->pool_to[pidx] = cpu;
180                pidx++;
181        }
182        /* cpus brought online later all get mapped to pool0, sorry */
183
184        return pidx;
185};
186
187
188/*
189 * Initialise the pool map for SVC_POOL_PERNODE mode.
190 * Returns number of pools or <0 on error.
191 */
192static int
193svc_pool_map_init_pernode(struct svc_pool_map *m)
194{
195        unsigned int maxpools = nr_node_ids;
196        unsigned int pidx = 0;
197        unsigned int node;
198        int err;
199
200        err = svc_pool_map_alloc_arrays(m, maxpools);
201        if (err)
202                return err;
203
204        for_each_node_with_cpus(node) {
205                /* some architectures (e.g. SN2) have cpuless nodes */
206                BUG_ON(pidx > maxpools);
207                m->to_pool[node] = pidx;
208                m->pool_to[pidx] = node;
209                pidx++;
210        }
211        /* nodes brought online later all get mapped to pool0, sorry */
212
213        return pidx;
214}
215
216
217/*
218 * Add a reference to the global map of cpus to pools (and
219 * vice versa).  Initialise the map if we're the first user.
220 * Returns the number of pools.
221 */
222unsigned int
223svc_pool_map_get(void)
224{
225        struct svc_pool_map *m = &svc_pool_map;
226        int npools = -1;
227
228        mutex_lock(&svc_pool_map_mutex);
229
230        if (m->count++) {
231                mutex_unlock(&svc_pool_map_mutex);
232                return m->npools;
233        }
234
235        if (m->mode == SVC_POOL_AUTO)
236                m->mode = svc_pool_map_choose_mode();
237
238        switch (m->mode) {
239        case SVC_POOL_PERCPU:
240                npools = svc_pool_map_init_percpu(m);
241                break;
242        case SVC_POOL_PERNODE:
243                npools = svc_pool_map_init_pernode(m);
244                break;
245        }
246
247        if (npools < 0) {
248                /* default, or memory allocation failure */
249                npools = 1;
250                m->mode = SVC_POOL_GLOBAL;
251        }
252        m->npools = npools;
253
254        mutex_unlock(&svc_pool_map_mutex);
255        return m->npools;
256}
257EXPORT_SYMBOL_GPL(svc_pool_map_get);
258
259/*
260 * Drop a reference to the global map of cpus to pools.
261 * When the last reference is dropped, the map data is
262 * freed; this allows the sysadmin to change the pool
263 * mode using the pool_mode module option without
264 * rebooting or re-loading sunrpc.ko.
265 */
266void
267svc_pool_map_put(void)
268{
269        struct svc_pool_map *m = &svc_pool_map;
270
271        mutex_lock(&svc_pool_map_mutex);
272
273        if (!--m->count) {
274                kfree(m->to_pool);
275                m->to_pool = NULL;
276                kfree(m->pool_to);
277                m->pool_to = NULL;
278                m->npools = 0;
279        }
280
281        mutex_unlock(&svc_pool_map_mutex);
282}
283EXPORT_SYMBOL_GPL(svc_pool_map_put);
284
285static int svc_pool_map_get_node(unsigned int pidx)
286{
287        const struct svc_pool_map *m = &svc_pool_map;
288
289        if (m->count) {
290                if (m->mode == SVC_POOL_PERCPU)
291                        return cpu_to_node(m->pool_to[pidx]);
292                if (m->mode == SVC_POOL_PERNODE)
293                        return m->pool_to[pidx];
294        }
295        return NUMA_NO_NODE;
296}
297/*
298 * Set the given thread's cpus_allowed mask so that it
299 * will only run on cpus in the given pool.
300 */
301static inline void
302svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
303{
304        struct svc_pool_map *m = &svc_pool_map;
305        unsigned int node = m->pool_to[pidx];
306
307        /*
308         * The caller checks for sv_nrpools > 1, which
309         * implies that we've been initialized.
310         */
311        WARN_ON_ONCE(m->count == 0);
312        if (m->count == 0)
313                return;
314
315        switch (m->mode) {
316        case SVC_POOL_PERCPU:
317        {
318                set_cpus_allowed_ptr(task, cpumask_of(node));
319                break;
320        }
321        case SVC_POOL_PERNODE:
322        {
323                set_cpus_allowed_ptr(task, cpumask_of_node(node));
324                break;
325        }
326        }
327}
328
329/*
330 * Use the mapping mode to choose a pool for a given CPU.
331 * Used when enqueueing an incoming RPC.  Always returns
332 * a non-NULL pool pointer.
333 */
334struct svc_pool *
335svc_pool_for_cpu(struct svc_serv *serv, int cpu)
336{
337        struct svc_pool_map *m = &svc_pool_map;
338        unsigned int pidx = 0;
339
340        /*
341         * An uninitialised map happens in a pure client when
342         * lockd is brought up, so silently treat it the
343         * same as SVC_POOL_GLOBAL.
344         */
345        if (svc_serv_is_pooled(serv)) {
346                switch (m->mode) {
347                case SVC_POOL_PERCPU:
348                        pidx = m->to_pool[cpu];
349                        break;
350                case SVC_POOL_PERNODE:
351                        pidx = m->to_pool[cpu_to_node(cpu)];
352                        break;
353                }
354        }
355        return &serv->sv_pools[pidx % serv->sv_nrpools];
356}
357
358int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
359{
360        int err;
361
362        err = rpcb_create_local(net);
363        if (err)
364                return err;
365
366        /* Remove any stale portmap registrations */
367        svc_unregister(serv, net);
368        return 0;
369}
370EXPORT_SYMBOL_GPL(svc_rpcb_setup);
371
372void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
373{
374        svc_unregister(serv, net);
375        rpcb_put_local(net);
376}
377EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
378
379static int svc_uses_rpcbind(struct svc_serv *serv)
380{
381        struct svc_program      *progp;
382        unsigned int            i;
383
384        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
385                for (i = 0; i < progp->pg_nvers; i++) {
386                        if (progp->pg_vers[i] == NULL)
387                                continue;
388                        if (progp->pg_vers[i]->vs_hidden == 0)
389                                return 1;
390                }
391        }
392
393        return 0;
394}
395
396int svc_bind(struct svc_serv *serv, struct net *net)
397{
398        if (!svc_uses_rpcbind(serv))
399                return 0;
400        return svc_rpcb_setup(serv, net);
401}
402EXPORT_SYMBOL_GPL(svc_bind);
403
404#if defined(CONFIG_SUNRPC_BACKCHANNEL)
405static void
406__svc_init_bc(struct svc_serv *serv)
407{
408        INIT_LIST_HEAD(&serv->sv_cb_list);
409        spin_lock_init(&serv->sv_cb_lock);
410        init_waitqueue_head(&serv->sv_cb_waitq);
411}
412#else
413static void
414__svc_init_bc(struct svc_serv *serv)
415{
416}
417#endif
418
419/*
420 * Create an RPC service
421 */
422static struct svc_serv *
423__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
424             struct svc_serv_ops *ops)
425{
426        struct svc_serv *serv;
427        unsigned int vers;
428        unsigned int xdrsize;
429        unsigned int i;
430
431        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
432                return NULL;
433        serv->sv_name      = prog->pg_name;
434        serv->sv_program   = prog;
435        serv->sv_nrthreads = 1;
436        serv->sv_stats     = prog->pg_stats;
437        if (bufsize > RPCSVC_MAXPAYLOAD)
438                bufsize = RPCSVC_MAXPAYLOAD;
439        serv->sv_max_payload = bufsize? bufsize : 4096;
440        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
441        serv->sv_ops = ops;
442        xdrsize = 0;
443        while (prog) {
444                prog->pg_lovers = prog->pg_nvers-1;
445                for (vers=0; vers<prog->pg_nvers ; vers++)
446                        if (prog->pg_vers[vers]) {
447                                prog->pg_hivers = vers;
448                                if (prog->pg_lovers > vers)
449                                        prog->pg_lovers = vers;
450                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
451                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
452                        }
453                prog = prog->pg_next;
454        }
455        serv->sv_xdrsize   = xdrsize;
456        INIT_LIST_HEAD(&serv->sv_tempsocks);
457        INIT_LIST_HEAD(&serv->sv_permsocks);
458        init_timer(&serv->sv_temptimer);
459        spin_lock_init(&serv->sv_lock);
460
461        __svc_init_bc(serv);
462
463        serv->sv_nrpools = npools;
464        serv->sv_pools =
465                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
466                        GFP_KERNEL);
467        if (!serv->sv_pools) {
468                kfree(serv);
469                return NULL;
470        }
471
472        for (i = 0; i < serv->sv_nrpools; i++) {
473                struct svc_pool *pool = &serv->sv_pools[i];
474
475                dprintk("svc: initialising pool %u for %s\n",
476                                i, serv->sv_name);
477
478                pool->sp_id = i;
479                INIT_LIST_HEAD(&pool->sp_sockets);
480                INIT_LIST_HEAD(&pool->sp_all_threads);
481                spin_lock_init(&pool->sp_lock);
482        }
483
484        return serv;
485}
486
487struct svc_serv *
488svc_create(struct svc_program *prog, unsigned int bufsize,
489           struct svc_serv_ops *ops)
490{
491        return __svc_create(prog, bufsize, /*npools*/1, ops);
492}
493EXPORT_SYMBOL_GPL(svc_create);
494
495struct svc_serv *
496svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
497                  struct svc_serv_ops *ops)
498{
499        struct svc_serv *serv;
500        unsigned int npools = svc_pool_map_get();
501
502        serv = __svc_create(prog, bufsize, npools, ops);
503        if (!serv)
504                goto out_err;
505        return serv;
506out_err:
507        svc_pool_map_put();
508        return NULL;
509}
510EXPORT_SYMBOL_GPL(svc_create_pooled);
511
512void svc_shutdown_net(struct svc_serv *serv, struct net *net)
513{
514        svc_close_net(serv, net);
515
516        if (serv->sv_ops->svo_shutdown)
517                serv->sv_ops->svo_shutdown(serv, net);
518}
519EXPORT_SYMBOL_GPL(svc_shutdown_net);
520
521/*
522 * Destroy an RPC service. Should be called with appropriate locking to
523 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
524 */
525void
526svc_destroy(struct svc_serv *serv)
527{
528        dprintk("svc: svc_destroy(%s, %d)\n",
529                                serv->sv_program->pg_name,
530                                serv->sv_nrthreads);
531
532        if (serv->sv_nrthreads) {
533                if (--(serv->sv_nrthreads) != 0) {
534                        svc_sock_update_bufs(serv);
535                        return;
536                }
537        } else
538                printk("svc_destroy: no threads for serv=%p!\n", serv);
539
540        del_timer_sync(&serv->sv_temptimer);
541
542        /*
543         * The last user is gone and thus all sockets have to be destroyed to
544         * the point. Check this.
545         */
546        BUG_ON(!list_empty(&serv->sv_permsocks));
547        BUG_ON(!list_empty(&serv->sv_tempsocks));
548
549        cache_clean_deferred(serv);
550
551        if (svc_serv_is_pooled(serv))
552                svc_pool_map_put();
553
554        kfree(serv->sv_pools);
555        kfree(serv);
556}
557EXPORT_SYMBOL_GPL(svc_destroy);
558
559/*
560 * Allocate an RPC server's buffer space.
561 * We allocate pages and place them in rq_argpages.
562 */
563static int
564svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
565{
566        unsigned int pages, arghi;
567
568        /* bc_xprt uses fore channel allocated buffers */
569        if (svc_is_backchannel(rqstp))
570                return 1;
571
572        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
573                                       * We assume one is at most one page
574                                       */
575        arghi = 0;
576        WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
577        if (pages > RPCSVC_MAXPAGES)
578                pages = RPCSVC_MAXPAGES;
579        while (pages) {
580                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
581                if (!p)
582                        break;
583                rqstp->rq_pages[arghi++] = p;
584                pages--;
585        }
586        return pages == 0;
587}
588
589/*
590 * Release an RPC server buffer
591 */
592static void
593svc_release_buffer(struct svc_rqst *rqstp)
594{
595        unsigned int i;
596
597        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
598                if (rqstp->rq_pages[i])
599                        put_page(rqstp->rq_pages[i]);
600}
601
602struct svc_rqst *
603svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
604{
605        struct svc_rqst *rqstp;
606
607        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
608        if (!rqstp)
609                return rqstp;
610
611        __set_bit(RQ_BUSY, &rqstp->rq_flags);
612        spin_lock_init(&rqstp->rq_lock);
613        rqstp->rq_server = serv;
614        rqstp->rq_pool = pool;
615
616        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
617        if (!rqstp->rq_argp)
618                goto out_enomem;
619
620        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
621        if (!rqstp->rq_resp)
622                goto out_enomem;
623
624        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
625                goto out_enomem;
626
627        return rqstp;
628out_enomem:
629        svc_rqst_free(rqstp);
630        return NULL;
631}
632EXPORT_SYMBOL_GPL(svc_rqst_alloc);
633
634struct svc_rqst *
635svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
636{
637        struct svc_rqst *rqstp;
638
639        rqstp = svc_rqst_alloc(serv, pool, node);
640        if (!rqstp)
641                return ERR_PTR(-ENOMEM);
642
643        serv->sv_nrthreads++;
644        spin_lock_bh(&pool->sp_lock);
645        pool->sp_nrthreads++;
646        list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
647        spin_unlock_bh(&pool->sp_lock);
648        return rqstp;
649}
650EXPORT_SYMBOL_GPL(svc_prepare_thread);
651
652/*
653 * Choose a pool in which to create a new thread, for svc_set_num_threads
654 */
655static inline struct svc_pool *
656choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
657{
658        if (pool != NULL)
659                return pool;
660
661        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
662}
663
664/*
665 * Choose a thread to kill, for svc_set_num_threads
666 */
667static inline struct task_struct *
668choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
669{
670        unsigned int i;
671        struct task_struct *task = NULL;
672
673        if (pool != NULL) {
674                spin_lock_bh(&pool->sp_lock);
675        } else {
676                /* choose a pool in round-robin fashion */
677                for (i = 0; i < serv->sv_nrpools; i++) {
678                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
679                        spin_lock_bh(&pool->sp_lock);
680                        if (!list_empty(&pool->sp_all_threads))
681                                goto found_pool;
682                        spin_unlock_bh(&pool->sp_lock);
683                }
684                return NULL;
685        }
686
687found_pool:
688        if (!list_empty(&pool->sp_all_threads)) {
689                struct svc_rqst *rqstp;
690
691                /*
692                 * Remove from the pool->sp_all_threads list
693                 * so we don't try to kill it again.
694                 */
695                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
696                set_bit(RQ_VICTIM, &rqstp->rq_flags);
697                list_del_rcu(&rqstp->rq_all);
698                task = rqstp->rq_task;
699        }
700        spin_unlock_bh(&pool->sp_lock);
701
702        return task;
703}
704
705/*
706 * Create or destroy enough new threads to make the number
707 * of threads the given number.  If `pool' is non-NULL, applies
708 * only to threads in that pool, otherwise round-robins between
709 * all pools.  Caller must ensure that mutual exclusion between this and
710 * server startup or shutdown.
711 *
712 * Destroying threads relies on the service threads filling in
713 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
714 * has been created using svc_create_pooled().
715 *
716 * Based on code that used to be in nfsd_svc() but tweaked
717 * to be pool-aware.
718 */
719int
720svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
721{
722        struct svc_rqst *rqstp;
723        struct task_struct *task;
724        struct svc_pool *chosen_pool;
725        int error = 0;
726        unsigned int state = serv->sv_nrthreads-1;
727        int node;
728
729        if (pool == NULL) {
730                /* The -1 assumes caller has done a svc_get() */
731                nrservs -= (serv->sv_nrthreads-1);
732        } else {
733                spin_lock_bh(&pool->sp_lock);
734                nrservs -= pool->sp_nrthreads;
735                spin_unlock_bh(&pool->sp_lock);
736        }
737
738        /* create new threads */
739        while (nrservs > 0) {
740                nrservs--;
741                chosen_pool = choose_pool(serv, pool, &state);
742
743                node = svc_pool_map_get_node(chosen_pool->sp_id);
744                rqstp = svc_prepare_thread(serv, chosen_pool, node);
745                if (IS_ERR(rqstp)) {
746                        error = PTR_ERR(rqstp);
747                        break;
748                }
749
750                __module_get(serv->sv_ops->svo_module);
751                task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
752                                              node, "%s", serv->sv_name);
753                if (IS_ERR(task)) {
754                        error = PTR_ERR(task);
755                        module_put(serv->sv_ops->svo_module);
756                        svc_exit_thread(rqstp);
757                        break;
758                }
759
760                rqstp->rq_task = task;
761                if (serv->sv_nrpools > 1)
762                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
763
764                svc_sock_update_bufs(serv);
765                wake_up_process(task);
766        }
767        /* destroy old threads */
768        while (nrservs < 0 &&
769               (task = choose_victim(serv, pool, &state)) != NULL) {
770                send_sig(SIGINT, task, 1);
771                nrservs++;
772        }
773
774        return error;
775}
776EXPORT_SYMBOL_GPL(svc_set_num_threads);
777
778/*
779 * Called from a server thread as it's exiting. Caller must hold the "service
780 * mutex" for the service.
781 */
782void
783svc_rqst_free(struct svc_rqst *rqstp)
784{
785        svc_release_buffer(rqstp);
786        kfree(rqstp->rq_resp);
787        kfree(rqstp->rq_argp);
788        kfree(rqstp->rq_auth_data);
789        kfree_rcu(rqstp, rq_rcu_head);
790}
791EXPORT_SYMBOL_GPL(svc_rqst_free);
792
793void
794svc_exit_thread(struct svc_rqst *rqstp)
795{
796        struct svc_serv *serv = rqstp->rq_server;
797        struct svc_pool *pool = rqstp->rq_pool;
798
799        spin_lock_bh(&pool->sp_lock);
800        pool->sp_nrthreads--;
801        if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
802                list_del_rcu(&rqstp->rq_all);
803        spin_unlock_bh(&pool->sp_lock);
804
805        svc_rqst_free(rqstp);
806
807        /* Release the server */
808        if (serv)
809                svc_destroy(serv);
810}
811EXPORT_SYMBOL_GPL(svc_exit_thread);
812
813/*
814 * Register an "inet" protocol family netid with the local
815 * rpcbind daemon via an rpcbind v4 SET request.
816 *
817 * No netconfig infrastructure is available in the kernel, so
818 * we map IP_ protocol numbers to netids by hand.
819 *
820 * Returns zero on success; a negative errno value is returned
821 * if any error occurs.
822 */
823static int __svc_rpcb_register4(struct net *net, const u32 program,
824                                const u32 version,
825                                const unsigned short protocol,
826                                const unsigned short port)
827{
828        const struct sockaddr_in sin = {
829                .sin_family             = AF_INET,
830                .sin_addr.s_addr        = htonl(INADDR_ANY),
831                .sin_port               = htons(port),
832        };
833        const char *netid;
834        int error;
835
836        switch (protocol) {
837        case IPPROTO_UDP:
838                netid = RPCBIND_NETID_UDP;
839                break;
840        case IPPROTO_TCP:
841                netid = RPCBIND_NETID_TCP;
842                break;
843        default:
844                return -ENOPROTOOPT;
845        }
846
847        error = rpcb_v4_register(net, program, version,
848                                        (const struct sockaddr *)&sin, netid);
849
850        /*
851         * User space didn't support rpcbind v4, so retry this
852         * registration request with the legacy rpcbind v2 protocol.
853         */
854        if (error == -EPROTONOSUPPORT)
855                error = rpcb_register(net, program, version, protocol, port);
856
857        return error;
858}
859
860#if IS_ENABLED(CONFIG_IPV6)
861/*
862 * Register an "inet6" protocol family netid with the local
863 * rpcbind daemon via an rpcbind v4 SET request.
864 *
865 * No netconfig infrastructure is available in the kernel, so
866 * we map IP_ protocol numbers to netids by hand.
867 *
868 * Returns zero on success; a negative errno value is returned
869 * if any error occurs.
870 */
871static int __svc_rpcb_register6(struct net *net, const u32 program,
872                                const u32 version,
873                                const unsigned short protocol,
874                                const unsigned short port)
875{
876        const struct sockaddr_in6 sin6 = {
877                .sin6_family            = AF_INET6,
878                .sin6_addr              = IN6ADDR_ANY_INIT,
879                .sin6_port              = htons(port),
880        };
881        const char *netid;
882        int error;
883
884        switch (protocol) {
885        case IPPROTO_UDP:
886                netid = RPCBIND_NETID_UDP6;
887                break;
888        case IPPROTO_TCP:
889                netid = RPCBIND_NETID_TCP6;
890                break;
891        default:
892                return -ENOPROTOOPT;
893        }
894
895        error = rpcb_v4_register(net, program, version,
896                                        (const struct sockaddr *)&sin6, netid);
897
898        /*
899         * User space didn't support rpcbind version 4, so we won't
900         * use a PF_INET6 listener.
901         */
902        if (error == -EPROTONOSUPPORT)
903                error = -EAFNOSUPPORT;
904
905        return error;
906}
907#endif  /* IS_ENABLED(CONFIG_IPV6) */
908
909/*
910 * Register a kernel RPC service via rpcbind version 4.
911 *
912 * Returns zero on success; a negative errno value is returned
913 * if any error occurs.
914 */
915static int __svc_register(struct net *net, const char *progname,
916                          const u32 program, const u32 version,
917                          const int family,
918                          const unsigned short protocol,
919                          const unsigned short port)
920{
921        int error = -EAFNOSUPPORT;
922
923        switch (family) {
924        case PF_INET:
925                error = __svc_rpcb_register4(net, program, version,
926                                                protocol, port);
927                break;
928#if IS_ENABLED(CONFIG_IPV6)
929        case PF_INET6:
930                error = __svc_rpcb_register6(net, program, version,
931                                                protocol, port);
932#endif
933        }
934
935        return error;
936}
937
938/**
939 * svc_register - register an RPC service with the local portmapper
940 * @serv: svc_serv struct for the service to register
941 * @net: net namespace for the service to register
942 * @family: protocol family of service's listener socket
943 * @proto: transport protocol number to advertise
944 * @port: port to advertise
945 *
946 * Service is registered for any address in the passed-in protocol family
947 */
948int svc_register(const struct svc_serv *serv, struct net *net,
949                 const int family, const unsigned short proto,
950                 const unsigned short port)
951{
952        struct svc_program      *progp;
953        struct svc_version      *vers;
954        unsigned int            i;
955        int                     error = 0;
956
957        WARN_ON_ONCE(proto == 0 && port == 0);
958        if (proto == 0 && port == 0)
959                return -EINVAL;
960
961        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
962                for (i = 0; i < progp->pg_nvers; i++) {
963                        vers = progp->pg_vers[i];
964                        if (vers == NULL)
965                                continue;
966
967                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
968                                        progp->pg_name,
969                                        i,
970                                        proto == IPPROTO_UDP?  "udp" : "tcp",
971                                        port,
972                                        family,
973                                        vers->vs_hidden ?
974                                        " (but not telling portmap)" : "");
975
976                        if (vers->vs_hidden)
977                                continue;
978
979                        error = __svc_register(net, progp->pg_name, progp->pg_prog,
980                                                i, family, proto, port);
981
982                        if (vers->vs_rpcb_optnl) {
983                                error = 0;
984                                continue;
985                        }
986
987                        if (error < 0) {
988                                printk(KERN_WARNING "svc: failed to register "
989                                        "%sv%u RPC service (errno %d).\n",
990                                        progp->pg_name, i, -error);
991                                break;
992                        }
993                }
994        }
995
996        return error;
997}
998
999/*
1000 * If user space is running rpcbind, it should take the v4 UNSET
1001 * and clear everything for this [program, version].  If user space
1002 * is running portmap, it will reject the v4 UNSET, but won't have
1003 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
1004 * in this case to clear all existing entries for [program, version].
1005 */
1006static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1007                             const char *progname)
1008{
1009        int error;
1010
1011        error = rpcb_v4_register(net, program, version, NULL, "");
1012
1013        /*
1014         * User space didn't support rpcbind v4, so retry this
1015         * request with the legacy rpcbind v2 protocol.
1016         */
1017        if (error == -EPROTONOSUPPORT)
1018                error = rpcb_register(net, program, version, 0, 0);
1019
1020        dprintk("svc: %s(%sv%u), error %d\n",
1021                        __func__, progname, version, error);
1022}
1023
1024/*
1025 * All netids, bind addresses and ports registered for [program, version]
1026 * are removed from the local rpcbind database (if the service is not
1027 * hidden) to make way for a new instance of the service.
1028 *
1029 * The result of unregistration is reported via dprintk for those who want
1030 * verification of the result, but is otherwise not important.
1031 */
1032static void svc_unregister(const struct svc_serv *serv, struct net *net)
1033{
1034        struct svc_program *progp;
1035        unsigned long flags;
1036        unsigned int i;
1037
1038        clear_thread_flag(TIF_SIGPENDING);
1039
1040        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1041                for (i = 0; i < progp->pg_nvers; i++) {
1042                        if (progp->pg_vers[i] == NULL)
1043                                continue;
1044                        if (progp->pg_vers[i]->vs_hidden)
1045                                continue;
1046
1047                        dprintk("svc: attempting to unregister %sv%u\n",
1048                                progp->pg_name, i);
1049                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1050                }
1051        }
1052
1053        spin_lock_irqsave(&current->sighand->siglock, flags);
1054        recalc_sigpending();
1055        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1056}
1057
1058/*
1059 * dprintk the given error with the address of the client that caused it.
1060 */
1061#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1062static __printf(2, 3)
1063void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1064{
1065        struct va_format vaf;
1066        va_list args;
1067        char    buf[RPC_MAX_ADDRBUFLEN];
1068
1069        va_start(args, fmt);
1070
1071        vaf.fmt = fmt;
1072        vaf.va = &args;
1073
1074        dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1075
1076        va_end(args);
1077}
1078#else
1079static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1080#endif
1081
1082/*
1083 * Common routine for processing the RPC request.
1084 */
1085static int
1086svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1087{
1088        struct svc_program      *progp;
1089        struct svc_version      *versp = NULL;  /* compiler food */
1090        struct svc_procedure    *procp = NULL;
1091        struct svc_serv         *serv = rqstp->rq_server;
1092        kxdrproc_t              xdr;
1093        __be32                  *statp;
1094        u32                     prog, vers, proc;
1095        __be32                  auth_stat, rpc_stat;
1096        int                     auth_res;
1097        __be32                  *reply_statp;
1098
1099        rpc_stat = rpc_success;
1100
1101        if (argv->iov_len < 6*4)
1102                goto err_short_len;
1103
1104        /* Will be turned off only in gss privacy case: */
1105        set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1106        /* Will be turned off only when NFSv4 Sessions are used */
1107        set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1108        clear_bit(RQ_DROPME, &rqstp->rq_flags);
1109
1110        /* Setup reply header */
1111        rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1112
1113        svc_putu32(resv, rqstp->rq_xid);
1114
1115        vers = svc_getnl(argv);
1116
1117        /* First words of reply: */
1118        svc_putnl(resv, 1);             /* REPLY */
1119
1120        if (vers != 2)          /* RPC version number */
1121                goto err_bad_rpc;
1122
1123        /* Save position in case we later decide to reject: */
1124        reply_statp = resv->iov_base + resv->iov_len;
1125
1126        svc_putnl(resv, 0);             /* ACCEPT */
1127
1128        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1129        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1130        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1131
1132        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1133                if (prog == progp->pg_prog)
1134                        break;
1135
1136        /*
1137         * Decode auth data, and add verifier to reply buffer.
1138         * We do this before anything else in order to get a decent
1139         * auth verifier.
1140         */
1141        auth_res = svc_authenticate(rqstp, &auth_stat);
1142        /* Also give the program a chance to reject this call: */
1143        if (auth_res == SVC_OK && progp) {
1144                auth_stat = rpc_autherr_badcred;
1145                auth_res = progp->pg_authenticate(rqstp);
1146        }
1147        switch (auth_res) {
1148        case SVC_OK:
1149                break;
1150        case SVC_GARBAGE:
1151                goto err_garbage;
1152        case SVC_SYSERR:
1153                rpc_stat = rpc_system_err;
1154                goto err_bad;
1155        case SVC_DENIED:
1156                goto err_bad_auth;
1157        case SVC_CLOSE:
1158                goto close;
1159        case SVC_DROP:
1160                goto dropit;
1161        case SVC_COMPLETE:
1162                goto sendit;
1163        }
1164
1165        if (progp == NULL)
1166                goto err_bad_prog;
1167
1168        if (vers >= progp->pg_nvers ||
1169          !(versp = progp->pg_vers[vers]))
1170                goto err_bad_vers;
1171
1172        procp = versp->vs_proc + proc;
1173        if (proc >= versp->vs_nproc || !procp->pc_func)
1174                goto err_bad_proc;
1175        rqstp->rq_procinfo = procp;
1176
1177        /* Syntactic check complete */
1178        serv->sv_stats->rpccnt++;
1179
1180        /* Build the reply header. */
1181        statp = resv->iov_base +resv->iov_len;
1182        svc_putnl(resv, RPC_SUCCESS);
1183
1184        /* Bump per-procedure stats counter */
1185        procp->pc_count++;
1186
1187        /* Initialize storage for argp and resp */
1188        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1189        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1190
1191        /* un-reserve some of the out-queue now that we have a
1192         * better idea of reply size
1193         */
1194        if (procp->pc_xdrressize)
1195                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1196
1197        /* Call the function that processes the request. */
1198        if (!versp->vs_dispatch) {
1199                /* Decode arguments */
1200                xdr = procp->pc_decode;
1201                if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1202                        goto err_garbage;
1203
1204                *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1205
1206                /* Encode reply */
1207                if (*statp == rpc_drop_reply ||
1208                    test_bit(RQ_DROPME, &rqstp->rq_flags)) {
1209                        if (procp->pc_release)
1210                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1211                        goto dropit;
1212                }
1213                if (*statp == rpc_autherr_badcred) {
1214                        if (procp->pc_release)
1215                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1216                        goto err_bad_auth;
1217                }
1218                if (*statp == rpc_success &&
1219                    (xdr = procp->pc_encode) &&
1220                    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1221                        dprintk("svc: failed to encode reply\n");
1222                        /* serv->sv_stats->rpcsystemerr++; */
1223                        *statp = rpc_system_err;
1224                }
1225        } else {
1226                dprintk("svc: calling dispatcher\n");
1227                if (!versp->vs_dispatch(rqstp, statp)) {
1228                        /* Release reply info */
1229                        if (procp->pc_release)
1230                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1231                        goto dropit;
1232                }
1233        }
1234
1235        /* Check RPC status result */
1236        if (*statp != rpc_success)
1237                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1238
1239        /* Release reply info */
1240        if (procp->pc_release)
1241                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1242
1243        if (procp->pc_encode == NULL)
1244                goto dropit;
1245
1246 sendit:
1247        if (svc_authorise(rqstp))
1248                goto close;
1249        return 1;               /* Caller can now send it */
1250
1251 dropit:
1252        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1253        dprintk("svc: svc_process dropit\n");
1254        return 0;
1255
1256 close:
1257        if (test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1258                svc_close_xprt(rqstp->rq_xprt);
1259        dprintk("svc: svc_process close\n");
1260        return 0;
1261
1262err_short_len:
1263        svc_printk(rqstp, "short len %Zd, dropping request\n",
1264                        argv->iov_len);
1265        goto close;
1266
1267err_bad_rpc:
1268        serv->sv_stats->rpcbadfmt++;
1269        svc_putnl(resv, 1);     /* REJECT */
1270        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1271        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1272        svc_putnl(resv, 2);
1273        goto sendit;
1274
1275err_bad_auth:
1276        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1277        serv->sv_stats->rpcbadauth++;
1278        /* Restore write pointer to location of accept status: */
1279        xdr_ressize_check(rqstp, reply_statp);
1280        svc_putnl(resv, 1);     /* REJECT */
1281        svc_putnl(resv, 1);     /* AUTH_ERROR */
1282        svc_putnl(resv, ntohl(auth_stat));      /* status */
1283        goto sendit;
1284
1285err_bad_prog:
1286        dprintk("svc: unknown program %d\n", prog);
1287        serv->sv_stats->rpcbadfmt++;
1288        svc_putnl(resv, RPC_PROG_UNAVAIL);
1289        goto sendit;
1290
1291err_bad_vers:
1292        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1293                       vers, prog, progp->pg_name);
1294
1295        serv->sv_stats->rpcbadfmt++;
1296        svc_putnl(resv, RPC_PROG_MISMATCH);
1297        svc_putnl(resv, progp->pg_lovers);
1298        svc_putnl(resv, progp->pg_hivers);
1299        goto sendit;
1300
1301err_bad_proc:
1302        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1303
1304        serv->sv_stats->rpcbadfmt++;
1305        svc_putnl(resv, RPC_PROC_UNAVAIL);
1306        goto sendit;
1307
1308err_garbage:
1309        svc_printk(rqstp, "failed to decode args\n");
1310
1311        rpc_stat = rpc_garbage_args;
1312err_bad:
1313        serv->sv_stats->rpcbadfmt++;
1314        svc_putnl(resv, ntohl(rpc_stat));
1315        goto sendit;
1316}
1317
1318/*
1319 * Process the RPC request.
1320 */
1321int
1322svc_process(struct svc_rqst *rqstp)
1323{
1324        struct kvec             *argv = &rqstp->rq_arg.head[0];
1325        struct kvec             *resv = &rqstp->rq_res.head[0];
1326        struct svc_serv         *serv = rqstp->rq_server;
1327        u32                     dir;
1328
1329        /*
1330         * Setup response xdr_buf.
1331         * Initially it has just one page
1332         */
1333        rqstp->rq_next_page = &rqstp->rq_respages[1];
1334        resv->iov_base = page_address(rqstp->rq_respages[0]);
1335        resv->iov_len = 0;
1336        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1337        rqstp->rq_res.len = 0;
1338        rqstp->rq_res.page_base = 0;
1339        rqstp->rq_res.page_len = 0;
1340        rqstp->rq_res.buflen = PAGE_SIZE;
1341        rqstp->rq_res.tail[0].iov_base = NULL;
1342        rqstp->rq_res.tail[0].iov_len = 0;
1343
1344        dir  = svc_getnl(argv);
1345        if (dir != 0) {
1346                /* direction != CALL */
1347                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1348                serv->sv_stats->rpcbadfmt++;
1349                goto out_drop;
1350        }
1351
1352        /* Returns 1 for send, 0 for drop */
1353        if (likely(svc_process_common(rqstp, argv, resv))) {
1354                int ret = svc_send(rqstp);
1355
1356                trace_svc_process(rqstp, ret);
1357                return ret;
1358        }
1359out_drop:
1360        trace_svc_process(rqstp, 0);
1361        svc_drop(rqstp);
1362        return 0;
1363}
1364EXPORT_SYMBOL_GPL(svc_process);
1365
1366#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1367/*
1368 * Process a backchannel RPC request that arrived over an existing
1369 * outbound connection
1370 */
1371int
1372bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1373               struct svc_rqst *rqstp)
1374{
1375        struct kvec     *argv = &rqstp->rq_arg.head[0];
1376        struct kvec     *resv = &rqstp->rq_res.head[0];
1377        struct rpc_task *task;
1378        int proc_error;
1379        int error;
1380
1381        dprintk("svc: %s(%p)\n", __func__, req);
1382
1383        /* Build the svc_rqst used by the common processing routine */
1384        rqstp->rq_xprt = serv->sv_bc_xprt;
1385        rqstp->rq_xid = req->rq_xid;
1386        rqstp->rq_prot = req->rq_xprt->prot;
1387        rqstp->rq_server = serv;
1388
1389        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1390        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1391        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1392        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1393
1394        /* Adjust the argument buffer length */
1395        rqstp->rq_arg.len = req->rq_private_buf.len;
1396        if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1397                rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1398                rqstp->rq_arg.page_len = 0;
1399        } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1400                        rqstp->rq_arg.page_len)
1401                rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1402                        rqstp->rq_arg.head[0].iov_len;
1403        else
1404                rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1405                        rqstp->rq_arg.page_len;
1406
1407        /* reset result send buffer "put" position */
1408        resv->iov_len = 0;
1409
1410        /*
1411         * Skip the next two words because they've already been
1412         * processed in the transport
1413         */
1414        svc_getu32(argv);       /* XID */
1415        svc_getnl(argv);        /* CALLDIR */
1416
1417        /* Parse and execute the bc call */
1418        proc_error = svc_process_common(rqstp, argv, resv);
1419
1420        atomic_inc(&req->rq_xprt->bc_free_slots);
1421        if (!proc_error) {
1422                /* Processing error: drop the request */
1423                xprt_free_bc_request(req);
1424                return 0;
1425        }
1426
1427        /* Finally, send the reply synchronously */
1428        memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1429        task = rpc_run_bc_task(req);
1430        if (IS_ERR(task)) {
1431                error = PTR_ERR(task);
1432                goto out;
1433        }
1434
1435        WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1436        error = task->tk_status;
1437        rpc_put_task(task);
1438
1439out:
1440        dprintk("svc: %s(), error=%d\n", __func__, error);
1441        return error;
1442}
1443EXPORT_SYMBOL_GPL(bc_svc_process);
1444#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1445
1446/*
1447 * Return (transport-specific) limit on the rpc payload.
1448 */
1449u32 svc_max_payload(const struct svc_rqst *rqstp)
1450{
1451        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1452
1453        if (rqstp->rq_server->sv_max_payload < max)
1454                max = rqstp->rq_server->sv_max_payload;
1455        return max;
1456}
1457EXPORT_SYMBOL_GPL(svc_max_payload);
Note: See TracBrowser for help on using the repository browser.