aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJens Axboe <axboe@kernel.dk>2021-02-15 13:25:53 -0700
committerJens Axboe <axboe@kernel.dk>2021-02-15 13:41:54 -0700
commit276f31457f375639fd79c9eaf975593e750cd7f2 (patch)
tree1facab499b7edcb2ef73736429b2242394682f24
parent2190fb8ebaac8419fd31051ff69869c6971bedbb (diff)
downloadlinux-block-276f31457f375639fd79c9eaf975593e750cd7f2.tar.gz
io-wq: fork worker threads from original task
Signed-off-by: Jens Axboe <axboe@kernel.dk>
-rw-r--r--fs/io-wq.c256
-rw-r--r--fs/io-wq.h1
-rw-r--r--fs/io_uring.c9
-rw-r--r--include/linux/sched.h3
-rw-r--r--include/linux/sched/task.h1
-rw-r--r--kernel/fork.c2
6 files changed, 95 insertions, 177 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 79e1cbc7d3f77..5cb1629db7e13 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -13,12 +13,8 @@
#include <linux/sched/mm.h>
#include <linux/percpu.h>
#include <linux/slab.h>
-#include <linux/kthread.h>
#include <linux/rculist_nulls.h>
-#include <linux/fs_struct.h>
#include <linux/task_work.h>
-#include <linux/blk-cgroup.h>
-#include <linux/audit.h>
#include <linux/cpu.h>
#include "../kernel/sched/sched.h"
@@ -58,13 +54,6 @@ struct io_worker {
spinlock_t lock;
struct rcu_head rcu;
- struct mm_struct *mm;
-#ifdef CONFIG_BLK_CGROUP
- struct cgroup_subsys_state *blkcg_css;
-#endif
- const struct cred *cur_creds;
- const struct cred *saved_creds;
- struct nsproxy *restore_nsproxy;
};
#if BITS_PER_LONG == 64
@@ -140,61 +129,6 @@ static void io_worker_release(struct io_worker *worker)
wake_up_process(worker->task);
}
-/*
- * Note: drops the wqe->lock if returning true! The caller must re-acquire
- * the lock in that case. Some callers need to restart handling if this
- * happens, so we can't just re-acquire the lock on behalf of the caller.
- */
-static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
-{
- bool dropped_lock = false;
-
- if (worker->saved_creds) {
- revert_creds(worker->saved_creds);
- worker->cur_creds = worker->saved_creds = NULL;
- }
-
- if (current->files) {
- __acquire(&wqe->lock);
- raw_spin_unlock_irq(&wqe->lock);
- dropped_lock = true;
-
- task_lock(current);
- current->files = NULL;
- current->nsproxy = worker->restore_nsproxy;
- task_unlock(current);
- }
-
- if (current->fs)
- current->fs = NULL;
-
- /*
- * If we have an active mm, we need to drop the wq lock before unusing
- * it. If we do, return true and let the caller retry the idle loop.
- */
- if (worker->mm) {
- if (!dropped_lock) {
- __acquire(&wqe->lock);
- raw_spin_unlock_irq(&wqe->lock);
- dropped_lock = true;
- }
- __set_current_state(TASK_RUNNING);
- kthread_unuse_mm(worker->mm);
- mmput(worker->mm);
- worker->mm = NULL;
- }
-
-#ifdef CONFIG_BLK_CGROUP
- if (worker->blkcg_css) {
- kthread_associate_blkcg(NULL);
- worker->blkcg_css = NULL;
- }
-#endif
- if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY)
- current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
- return dropped_lock;
-}
-
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
struct io_wq_work *work)
{
@@ -239,10 +173,6 @@ static void io_worker_exit(struct io_worker *worker)
raw_spin_lock_irq(&wqe->lock);
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
- if (__io_worker_unuse(wqe, worker)) {
- __release(&wqe->lock);
- raw_spin_lock_irq(&wqe->lock);
- }
acct->nr_workers--;
raw_spin_unlock_irq(&wqe->lock);
@@ -327,11 +257,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
allow_kernel_signal(SIGINT);
current->flags |= PF_IO_WORKER;
- current->fs = NULL;
- current->files = NULL;
-
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
- worker->restore_nsproxy = current->nsproxy;
io_wqe_inc_running(wqe, worker);
}
@@ -388,7 +314,7 @@ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
}
- return __io_worker_unuse(wqe, worker);
+ return false;
}
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
@@ -427,81 +353,6 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
return NULL;
}
-static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
-{
- if (worker->mm) {
- kthread_unuse_mm(worker->mm);
- mmput(worker->mm);
- worker->mm = NULL;
- }
-
- if (mmget_not_zero(work->identity->mm)) {
- kthread_use_mm(work->identity->mm);
- worker->mm = work->identity->mm;
- return;
- }
-
- /* failed grabbing mm, ensure work gets cancelled */
- work->flags |= IO_WQ_WORK_CANCEL;
-}
-
-static inline void io_wq_switch_blkcg(struct io_worker *worker,
- struct io_wq_work *work)
-{
-#ifdef CONFIG_BLK_CGROUP
- if (!(work->flags & IO_WQ_WORK_BLKCG))
- return;
- if (work->identity->blkcg_css != worker->blkcg_css) {
- kthread_associate_blkcg(work->identity->blkcg_css);
- worker->blkcg_css = work->identity->blkcg_css;
- }
-#endif
-}
-
-static void io_wq_switch_creds(struct io_worker *worker,
- struct io_wq_work *work)
-{
- const struct cred *old_creds = override_creds(work->identity->creds);
-
- worker->cur_creds = work->identity->creds;
- if (worker->saved_creds)
- put_cred(old_creds); /* creds set by previous switch */
- else
- worker->saved_creds = old_creds;
-}
-
-static void io_impersonate_work(struct io_worker *worker,
- struct io_wq_work *work)
-{
- if ((work->flags & IO_WQ_WORK_FILES) &&
- current->files != work->identity->files) {
- task_lock(current);
- current->files = work->identity->files;
- current->nsproxy = work->identity->nsproxy;
- task_unlock(current);
- if (!work->identity->files) {
- /* failed grabbing files, ensure work gets cancelled */
- work->flags |= IO_WQ_WORK_CANCEL;
- }
- }
- if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs)
- current->fs = work->identity->fs;
- if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm)
- io_wq_switch_mm(worker, work);
- if ((work->flags & IO_WQ_WORK_CREDS) &&
- worker->cur_creds != work->identity->creds)
- io_wq_switch_creds(worker, work);
- if (work->flags & IO_WQ_WORK_FSIZE)
- current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize;
- else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY)
- current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
- io_wq_switch_blkcg(worker, work);
-#ifdef CONFIG_AUDIT
- current->loginuid = work->identity->loginuid;
- current->sessionid = work->identity->sessionid;
-#endif
-}
-
static void io_assign_current_work(struct io_worker *worker,
struct io_wq_work *work)
{
@@ -557,7 +408,6 @@ get_next:
unsigned int hash = io_get_work_hash(work);
next_hashed = wq_next_work(work);
- io_impersonate_work(worker, work);
wq->do_work(work);
io_assign_current_work(worker, NULL);
@@ -611,6 +461,8 @@ loop:
raw_spin_unlock_irq(&wqe->lock);
if (signal_pending(current))
flush_signals(current);
+ if (current->task_works)
+ task_work_run();
if (schedule_timeout(WORKER_IDLE_TIMEOUT))
continue;
/* timed out, exit unless we're the fixed worker */
@@ -631,10 +483,72 @@ loop:
return 0;
}
+static int task_thread(void *data, int index)
+{
+ struct io_worker *worker = data;
+ struct io_wqe *wqe = worker->wqe;
+ struct io_wqe_acct *acct = &wqe->acct[index];
+ struct io_wq *wq = wqe->wq;
+ unsigned long flags;
+
+ current->flags &= ~PF_KTHREAD;
+ current->pf_io_worker = worker;
+ worker->task = current;
+
+ raw_spin_lock_irqsave(&current->pi_lock, flags);
+ set_cpus_allowed_common(current, cpumask_of_node(wqe->node), 0);
+ current->flags |= PF_NO_SETAFFINITY;
+ raw_spin_unlock_irqrestore(&current->pi_lock, flags);
+
+ raw_spin_lock_irq(&wqe->lock);
+ hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
+ list_add_tail_rcu(&worker->all_list, &wqe->all_list);
+ worker->flags |= IO_WORKER_F_FREE;
+ if (index == IO_WQ_ACCT_BOUND)
+ worker->flags |= IO_WORKER_F_BOUND;
+ if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
+ worker->flags |= IO_WORKER_F_FIXED;
+ acct->nr_workers++;
+ raw_spin_unlock_irq(&wqe->lock);
+
+ if (index == IO_WQ_ACCT_UNBOUND)
+ atomic_inc(&wq->user->processes);
+
+ io_wqe_worker(data);
+ do_exit(0);
+}
+
+static int task_thread_bound(void *data)
+{
+ return task_thread(data, IO_WQ_ACCT_BOUND);
+}
+
+static int task_thread_unbound(void *data)
+{
+ return task_thread(data, IO_WQ_ACCT_UNBOUND);
+}
+
+static pid_t fork_thread(int (*fn)(void *), void *arg)
+{
+ unsigned long flags = CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|
+ CLONE_THREAD|SIGCHLD;
+ struct kernel_clone_args args = {
+ .flags = ((lower_32_bits(flags) | CLONE_VM |
+ CLONE_UNTRACED) & ~CSIGNAL),
+ .exit_signal = (lower_32_bits(flags) & CSIGNAL),
+ .stack = (unsigned long)fn,
+ .stack_size = (unsigned long)arg,
+ .io_wq = 1,
+ };
+
+ return kernel_clone(&args);
+}
+
static bool create_io_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
struct io_wq *wq = wqe->wq;
struct io_worker *worker;
+ pid_t pid;
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
if (!worker)
@@ -645,30 +559,16 @@ static bool create_io_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
worker->wqe = wqe;
spin_lock_init(&worker->lock);
- worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
- "io_wqe_worker-%d/%d", 0, wqe->node);
- if (IS_ERR(worker->task)) {
+ if (acct == &wqe->acct[IO_WQ_ACCT_BOUND])
+ pid = fork_thread(task_thread_bound, worker);
+ else
+ pid = fork_thread(task_thread_unbound, worker);
+ if (pid < 0) {
kfree(worker);
return false;
}
- kthread_bind_mask(worker->task, cpumask_of_node(wqe->node));
-
- raw_spin_lock_irq(&wqe->lock);
- hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
- list_add_tail_rcu(&worker->all_list, &wqe->all_list);
- worker->flags |= IO_WORKER_F_FREE;
- if (acct == &wqe->acct[IO_WQ_ACCT_BOUND])
- worker->flags |= IO_WORKER_F_BOUND;
- if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
- worker->flags |= IO_WORKER_F_FIXED;
- acct->nr_workers++;
- raw_spin_unlock_irq(&wqe->lock);
-
- if (acct == &wqe->acct[IO_WQ_ACCT_UNBOUND])
- atomic_inc(&wq->user->processes);
refcount_inc(&wq->refs);
- wake_up_process(worker->task);
return true;
}
@@ -677,7 +577,7 @@ static bool create_io_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
*/
void io_wq_worker_running(struct task_struct *tsk)
{
- struct io_worker *worker = kthread_data(tsk);
+ struct io_worker *worker = tsk->pf_io_worker;
struct io_wqe *wqe = worker->wqe;
if (!(worker->flags & IO_WORKER_F_UP))
@@ -694,7 +594,7 @@ void io_wq_worker_running(struct task_struct *tsk)
*/
void io_wq_worker_sleeping(struct task_struct *tsk)
{
- struct io_worker *worker = kthread_data(tsk);
+ struct io_worker *worker = tsk->pf_io_worker;
struct io_wqe *wqe = worker->wqe;
if (!(worker->flags & IO_WORKER_F_UP))
@@ -1053,19 +953,25 @@ bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
return refcount_inc_not_zero(&wq->use_refs);
}
-static void __io_wq_destroy(struct io_wq *wq)
+void io_wq_exit(struct io_wq *wq)
{
int node;
- cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
-
set_bit(IO_WQ_BIT_EXIT, &wq->state);
rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();
+}
+
+static void __io_wq_destroy(struct io_wq *wq)
+{
+ int node;
+
+ cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
+ io_wq_exit(wq);
if (refcount_dec_and_test(&wq->refs))
complete(&wq->done);
diff --git a/fs/io-wq.h b/fs/io-wq.h
index 096f1021018e5..a302fabd04ab4 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -110,6 +110,7 @@ struct io_wq_data {
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
bool io_wq_get(struct io_wq *wq, struct io_wq_data *data);
void io_wq_destroy(struct io_wq *wq);
+void io_wq_exit(struct io_wq *wq);
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
void io_wq_hash_work(struct io_wq_work *work, void *val);
diff --git a/fs/io_uring.c b/fs/io_uring.c
index f8fa73e72836b..d019f2d9d78ae 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -9133,8 +9133,13 @@ void __io_uring_files_cancel(struct files_struct *files)
/* make sure overflow events are dropped */
atomic_inc(&tctx->in_idle);
- xa_for_each(&tctx->xa, index, file)
- io_uring_cancel_task_requests(file->private_data, files);
+ xa_for_each(&tctx->xa, index, file) {
+ struct io_ring_ctx *ctx = file->private_data;
+
+ if (ctx->io_wq)
+ io_wq_exit(ctx->io_wq);
+ io_uring_cancel_task_requests(ctx, files);
+ }
atomic_dec(&tctx->in_idle);
if (files)
diff --git a/include/linux/sched.h b/include/linux/sched.h
index 6e3a5eeec509a..a6a9f0323102f 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -895,6 +895,9 @@ struct task_struct {
/* CLONE_CHILD_CLEARTID: */
int __user *clear_child_tid;
+ /* PF_IO_WORKER */
+ void *pf_io_worker;
+
u64 utime;
u64 stime;
#ifdef CONFIG_ARCH_HAS_SCALED_CPUTIME
diff --git a/include/linux/sched/task.h b/include/linux/sched/task.h
index c0f71f2e7160c..e642483b7a576 100644
--- a/include/linux/sched/task.h
+++ b/include/linux/sched/task.h
@@ -30,6 +30,7 @@ struct kernel_clone_args {
pid_t *set_tid;
/* Number of elements in *set_tid */
size_t set_tid_size;
+ int io_wq;
int cgroup;
struct cgroup *cgrp;
struct css_set *cset;
diff --git a/kernel/fork.c b/kernel/fork.c
index d66cd1014211b..cc9d53bea0bf3 100644
--- a/kernel/fork.c
+++ b/kernel/fork.c
@@ -1940,6 +1940,8 @@ static __latent_entropy struct task_struct *copy_process(
p = dup_task_struct(current, node);
if (!p)
goto fork_out;
+ if (args->io_wq)
+ p->flags |= PF_KTHREAD;
/*
* This _must_ happen before we call free_task(), i.e. before we jump