diff options
| author | Jens Axboe <axboe@kernel.dk> | 2021-02-15 13:25:53 -0700 |
|---|---|---|
| committer | Jens Axboe <axboe@kernel.dk> | 2021-02-15 13:41:54 -0700 |
| commit | 276f31457f375639fd79c9eaf975593e750cd7f2 (patch) | |
| tree | 1facab499b7edcb2ef73736429b2242394682f24 | |
| parent | 2190fb8ebaac8419fd31051ff69869c6971bedbb (diff) | |
| download | linux-block-276f31457f375639fd79c9eaf975593e750cd7f2.tar.gz | |
io-wq: fork worker threads from original task
Signed-off-by: Jens Axboe <axboe@kernel.dk>
| -rw-r--r-- | fs/io-wq.c | 256 | ||||
| -rw-r--r-- | fs/io-wq.h | 1 | ||||
| -rw-r--r-- | fs/io_uring.c | 9 | ||||
| -rw-r--r-- | include/linux/sched.h | 3 | ||||
| -rw-r--r-- | include/linux/sched/task.h | 1 | ||||
| -rw-r--r-- | kernel/fork.c | 2 |
6 files changed, 95 insertions, 177 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index 79e1cbc7d3f77..5cb1629db7e13 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -13,12 +13,8 @@ #include <linux/sched/mm.h> #include <linux/percpu.h> #include <linux/slab.h> -#include <linux/kthread.h> #include <linux/rculist_nulls.h> -#include <linux/fs_struct.h> #include <linux/task_work.h> -#include <linux/blk-cgroup.h> -#include <linux/audit.h> #include <linux/cpu.h> #include "../kernel/sched/sched.h" @@ -58,13 +54,6 @@ struct io_worker { spinlock_t lock; struct rcu_head rcu; - struct mm_struct *mm; -#ifdef CONFIG_BLK_CGROUP - struct cgroup_subsys_state *blkcg_css; -#endif - const struct cred *cur_creds; - const struct cred *saved_creds; - struct nsproxy *restore_nsproxy; }; #if BITS_PER_LONG == 64 @@ -140,61 +129,6 @@ static void io_worker_release(struct io_worker *worker) wake_up_process(worker->task); } -/* - * Note: drops the wqe->lock if returning true! The caller must re-acquire - * the lock in that case. Some callers need to restart handling if this - * happens, so we can't just re-acquire the lock on behalf of the caller. - */ -static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) -{ - bool dropped_lock = false; - - if (worker->saved_creds) { - revert_creds(worker->saved_creds); - worker->cur_creds = worker->saved_creds = NULL; - } - - if (current->files) { - __acquire(&wqe->lock); - raw_spin_unlock_irq(&wqe->lock); - dropped_lock = true; - - task_lock(current); - current->files = NULL; - current->nsproxy = worker->restore_nsproxy; - task_unlock(current); - } - - if (current->fs) - current->fs = NULL; - - /* - * If we have an active mm, we need to drop the wq lock before unusing - * it. If we do, return true and let the caller retry the idle loop. - */ - if (worker->mm) { - if (!dropped_lock) { - __acquire(&wqe->lock); - raw_spin_unlock_irq(&wqe->lock); - dropped_lock = true; - } - __set_current_state(TASK_RUNNING); - kthread_unuse_mm(worker->mm); - mmput(worker->mm); - worker->mm = NULL; - } - -#ifdef CONFIG_BLK_CGROUP - if (worker->blkcg_css) { - kthread_associate_blkcg(NULL); - worker->blkcg_css = NULL; - } -#endif - if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) - current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; - return dropped_lock; -} - static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, struct io_wq_work *work) { @@ -239,10 +173,6 @@ static void io_worker_exit(struct io_worker *worker) raw_spin_lock_irq(&wqe->lock); hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); - if (__io_worker_unuse(wqe, worker)) { - __release(&wqe->lock); - raw_spin_lock_irq(&wqe->lock); - } acct->nr_workers--; raw_spin_unlock_irq(&wqe->lock); @@ -327,11 +257,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) allow_kernel_signal(SIGINT); current->flags |= PF_IO_WORKER; - current->fs = NULL; - current->files = NULL; - worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); - worker->restore_nsproxy = current->nsproxy; io_wqe_inc_running(wqe, worker); } @@ -388,7 +314,7 @@ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); } - return __io_worker_unuse(wqe, worker); + return false; } static inline unsigned int io_get_work_hash(struct io_wq_work *work) @@ -427,81 +353,6 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) return NULL; } -static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) -{ - if (worker->mm) { - kthread_unuse_mm(worker->mm); - mmput(worker->mm); - worker->mm = NULL; - } - - if (mmget_not_zero(work->identity->mm)) { - kthread_use_mm(work->identity->mm); - worker->mm = work->identity->mm; - return; - } - - /* failed grabbing mm, ensure work gets cancelled */ - work->flags |= IO_WQ_WORK_CANCEL; -} - -static inline void io_wq_switch_blkcg(struct io_worker *worker, - struct io_wq_work *work) -{ -#ifdef CONFIG_BLK_CGROUP - if (!(work->flags & IO_WQ_WORK_BLKCG)) - return; - if (work->identity->blkcg_css != worker->blkcg_css) { - kthread_associate_blkcg(work->identity->blkcg_css); - worker->blkcg_css = work->identity->blkcg_css; - } -#endif -} - -static void io_wq_switch_creds(struct io_worker *worker, - struct io_wq_work *work) -{ - const struct cred *old_creds = override_creds(work->identity->creds); - - worker->cur_creds = work->identity->creds; - if (worker->saved_creds) - put_cred(old_creds); /* creds set by previous switch */ - else - worker->saved_creds = old_creds; -} - -static void io_impersonate_work(struct io_worker *worker, - struct io_wq_work *work) -{ - if ((work->flags & IO_WQ_WORK_FILES) && - current->files != work->identity->files) { - task_lock(current); - current->files = work->identity->files; - current->nsproxy = work->identity->nsproxy; - task_unlock(current); - if (!work->identity->files) { - /* failed grabbing files, ensure work gets cancelled */ - work->flags |= IO_WQ_WORK_CANCEL; - } - } - if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs) - current->fs = work->identity->fs; - if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm) - io_wq_switch_mm(worker, work); - if ((work->flags & IO_WQ_WORK_CREDS) && - worker->cur_creds != work->identity->creds) - io_wq_switch_creds(worker, work); - if (work->flags & IO_WQ_WORK_FSIZE) - current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize; - else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) - current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; - io_wq_switch_blkcg(worker, work); -#ifdef CONFIG_AUDIT - current->loginuid = work->identity->loginuid; - current->sessionid = work->identity->sessionid; -#endif -} - static void io_assign_current_work(struct io_worker *worker, struct io_wq_work *work) { @@ -557,7 +408,6 @@ get_next: unsigned int hash = io_get_work_hash(work); next_hashed = wq_next_work(work); - io_impersonate_work(worker, work); wq->do_work(work); io_assign_current_work(worker, NULL); @@ -611,6 +461,8 @@ loop: raw_spin_unlock_irq(&wqe->lock); if (signal_pending(current)) flush_signals(current); + if (current->task_works) + task_work_run(); if (schedule_timeout(WORKER_IDLE_TIMEOUT)) continue; /* timed out, exit unless we're the fixed worker */ @@ -631,10 +483,72 @@ loop: return 0; } +static int task_thread(void *data, int index) +{ + struct io_worker *worker = data; + struct io_wqe *wqe = worker->wqe; + struct io_wqe_acct *acct = &wqe->acct[index]; + struct io_wq *wq = wqe->wq; + unsigned long flags; + + current->flags &= ~PF_KTHREAD; + current->pf_io_worker = worker; + worker->task = current; + + raw_spin_lock_irqsave(¤t->pi_lock, flags); + set_cpus_allowed_common(current, cpumask_of_node(wqe->node), 0); + current->flags |= PF_NO_SETAFFINITY; + raw_spin_unlock_irqrestore(¤t->pi_lock, flags); + + raw_spin_lock_irq(&wqe->lock); + hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); + list_add_tail_rcu(&worker->all_list, &wqe->all_list); + worker->flags |= IO_WORKER_F_FREE; + if (index == IO_WQ_ACCT_BOUND) + worker->flags |= IO_WORKER_F_BOUND; + if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) + worker->flags |= IO_WORKER_F_FIXED; + acct->nr_workers++; + raw_spin_unlock_irq(&wqe->lock); + + if (index == IO_WQ_ACCT_UNBOUND) + atomic_inc(&wq->user->processes); + + io_wqe_worker(data); + do_exit(0); +} + +static int task_thread_bound(void *data) +{ + return task_thread(data, IO_WQ_ACCT_BOUND); +} + +static int task_thread_unbound(void *data) +{ + return task_thread(data, IO_WQ_ACCT_UNBOUND); +} + +static pid_t fork_thread(int (*fn)(void *), void *arg) +{ + unsigned long flags = CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND| + CLONE_THREAD|SIGCHLD; + struct kernel_clone_args args = { + .flags = ((lower_32_bits(flags) | CLONE_VM | + CLONE_UNTRACED) & ~CSIGNAL), + .exit_signal = (lower_32_bits(flags) & CSIGNAL), + .stack = (unsigned long)fn, + .stack_size = (unsigned long)arg, + .io_wq = 1, + }; + + return kernel_clone(&args); +} + static bool create_io_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { struct io_wq *wq = wqe->wq; struct io_worker *worker; + pid_t pid; worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); if (!worker) @@ -645,30 +559,16 @@ static bool create_io_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) worker->wqe = wqe; spin_lock_init(&worker->lock); - worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, - "io_wqe_worker-%d/%d", 0, wqe->node); - if (IS_ERR(worker->task)) { + if (acct == &wqe->acct[IO_WQ_ACCT_BOUND]) + pid = fork_thread(task_thread_bound, worker); + else + pid = fork_thread(task_thread_unbound, worker); + if (pid < 0) { kfree(worker); return false; } - kthread_bind_mask(worker->task, cpumask_of_node(wqe->node)); - - raw_spin_lock_irq(&wqe->lock); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); - list_add_tail_rcu(&worker->all_list, &wqe->all_list); - worker->flags |= IO_WORKER_F_FREE; - if (acct == &wqe->acct[IO_WQ_ACCT_BOUND]) - worker->flags |= IO_WORKER_F_BOUND; - if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) - worker->flags |= IO_WORKER_F_FIXED; - acct->nr_workers++; - raw_spin_unlock_irq(&wqe->lock); - - if (acct == &wqe->acct[IO_WQ_ACCT_UNBOUND]) - atomic_inc(&wq->user->processes); refcount_inc(&wq->refs); - wake_up_process(worker->task); return true; } @@ -677,7 +577,7 @@ static bool create_io_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) */ void io_wq_worker_running(struct task_struct *tsk) { - struct io_worker *worker = kthread_data(tsk); + struct io_worker *worker = tsk->pf_io_worker; struct io_wqe *wqe = worker->wqe; if (!(worker->flags & IO_WORKER_F_UP)) @@ -694,7 +594,7 @@ void io_wq_worker_running(struct task_struct *tsk) */ void io_wq_worker_sleeping(struct task_struct *tsk) { - struct io_worker *worker = kthread_data(tsk); + struct io_worker *worker = tsk->pf_io_worker; struct io_wqe *wqe = worker->wqe; if (!(worker->flags & IO_WORKER_F_UP)) @@ -1053,19 +953,25 @@ bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) return refcount_inc_not_zero(&wq->use_refs); } -static void __io_wq_destroy(struct io_wq *wq) +void io_wq_exit(struct io_wq *wq) { int node; - cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); - set_bit(IO_WQ_BIT_EXIT, &wq->state); rcu_read_lock(); for_each_node(node) io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); rcu_read_unlock(); +} + +static void __io_wq_destroy(struct io_wq *wq) +{ + int node; + + cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); + io_wq_exit(wq); if (refcount_dec_and_test(&wq->refs)) complete(&wq->done); diff --git a/fs/io-wq.h b/fs/io-wq.h index 096f1021018e5..a302fabd04ab4 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -110,6 +110,7 @@ struct io_wq_data { struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); bool io_wq_get(struct io_wq *wq, struct io_wq_data *data); void io_wq_destroy(struct io_wq *wq); +void io_wq_exit(struct io_wq *wq); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); void io_wq_hash_work(struct io_wq_work *work, void *val); diff --git a/fs/io_uring.c b/fs/io_uring.c index f8fa73e72836b..d019f2d9d78ae 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -9133,8 +9133,13 @@ void __io_uring_files_cancel(struct files_struct *files) /* make sure overflow events are dropped */ atomic_inc(&tctx->in_idle); - xa_for_each(&tctx->xa, index, file) - io_uring_cancel_task_requests(file->private_data, files); + xa_for_each(&tctx->xa, index, file) { + struct io_ring_ctx *ctx = file->private_data; + + if (ctx->io_wq) + io_wq_exit(ctx->io_wq); + io_uring_cancel_task_requests(ctx, files); + } atomic_dec(&tctx->in_idle); if (files) diff --git a/include/linux/sched.h b/include/linux/sched.h index 6e3a5eeec509a..a6a9f0323102f 100644 --- a/include/linux/sched.h +++ b/include/linux/sched.h @@ -895,6 +895,9 @@ struct task_struct { /* CLONE_CHILD_CLEARTID: */ int __user *clear_child_tid; + /* PF_IO_WORKER */ + void *pf_io_worker; + u64 utime; u64 stime; #ifdef CONFIG_ARCH_HAS_SCALED_CPUTIME diff --git a/include/linux/sched/task.h b/include/linux/sched/task.h index c0f71f2e7160c..e642483b7a576 100644 --- a/include/linux/sched/task.h +++ b/include/linux/sched/task.h @@ -30,6 +30,7 @@ struct kernel_clone_args { pid_t *set_tid; /* Number of elements in *set_tid */ size_t set_tid_size; + int io_wq; int cgroup; struct cgroup *cgrp; struct css_set *cset; diff --git a/kernel/fork.c b/kernel/fork.c index d66cd1014211b..cc9d53bea0bf3 100644 --- a/kernel/fork.c +++ b/kernel/fork.c @@ -1940,6 +1940,8 @@ static __latent_entropy struct task_struct *copy_process( p = dup_task_struct(current, node); if (!p) goto fork_out; + if (args->io_wq) + p->flags |= PF_KTHREAD; /* * This _must_ happen before we call free_task(), i.e. before we jump |
