linux内核调度的机制 tasklet/workqueue/kthread_worker/kthreadx详解及示例【转】

转自:https://blog.csdn.net/zxpblog/article/details/108539245

前言: 一直就感觉linux下面的任务调度机制太丰富了,由于各种调度机制平时工作中只是要用,理解并不是那么深刻,所有有时候说不上道道来,只知道这个要用softirq/tasklet/workqueue/thread/, workqueue的优先级要设置成system_wq,system_highpri_wq, system_unbound_wq 或者thread 的SCHED_RR/SCHED_FIFO这样子,说实话,现在我也不能保证说概述的很全很准确(有不对地方,欢迎大家指出)。就期待后面可以慢慢完善,读者如果有建议补充的可以提建议,我们一起不断更新这篇文章,一起努力可以把linux 线程相关的东西吃懂吃透,目的是争取为社区贡献一篇好文章。 闲话少说:先简述吧  第1章:linux常见的任务调度的机制 1.1 softirq(不允许休眠阻塞,中断上下文) 软终端支持SMP,同一个softirq可以在不同CPU同时运行,必须是可重入的,是编译期间静态分配的,不想tasklet一样能被动态注册,删除(个人感觉因为不方便,所以使用较少)。kernel/softirq.c文件中定义了一个包含32个softirq_action结构体的数组,每个被注册的软终端都占据该数组的一项,因此最多可能有32个软中断。 特性: 1)一个软中断不会抢占另一个软中断 2)唯一可以抢占软中断的是ISR(中断服务程序) 3)其它软中断可以在其它处理器同时执行 4)一个注册的软中断必须被标记后才执行 5)软中断不可以自己休眠(不能自己调用sleep,wait等函数) 6)索引号小的软中断在索引号大的软中断之前执行  1.2 tasklet(不允许休眠阻塞,中断上下文) 中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。例如,假设一个数据块到达触发中断时,中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。因此,内核把中断处理分为两部分:上半部(tophalf)和下半部(bottomhalf),上半部(就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理,首先,一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。下半部运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别 中断bai处理的tasklet(小任务)机制 特性: 1)不允许两个相同的tasklet绝对不会同时执行,即使在不同CPU上。 2)从softirq衍生,但是使用简单,效率高(较softirq低,较workqueue高),常用 3)在中断期间运行时, 即使被多次调用也只会执行一次 4)SMP系统上,可以确保在第一个调用它的CPU执行  1.3 workqueue(允许休眠阻塞,进程上下文) 工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。 1)工作队列会在进程上下文中执行 2)可以阻塞 3)可以重新调度 4)缺省工作者线程(kthrerad worker && kthread work) 5)在工作队列和其它内核间用锁和其它进程上下文一样 6)默认允许响应中断 7)默认不持有任何锁 那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。  1.4 kthread Linux内核可以看作一个服务进程(管理软硬件资源,响应用户进程的种种合理以及不合理的请求)。内核需要多个执行流并行,为了防止可能的阻塞,支持多线程是必要的。内核线程就是内核的分身,一个分身可以处理一件特定事情。内核线程的调度由内核负责,一个内核线程处于阻塞状态时不影响其他的内核线程,因为其是调度的基本单位。这与用户线程是不一样的。因为内核线程只运行在内核态,因此,它只能使用大于PAGE_OFFSET(3G)的地址空间。内核线程和普通的进程间的区别在于内核线程没有独立的地址空间,mm指针被设置为NULL;它只在 内核空间运行,从来不切换到用户空间去;并且和普通进程一样,可以被调度,也可以被抢占。 内核线程(thread)或叫守护进程(daemon),在操作系统中占据相当大的比例,当Linux操作系统启动以后,你可以用”ps -ef”命令查看系统中的进程,这时会发现很多以”d”结尾的进程名,确切说名称显示里面加 []的,这些进程就是内核线程。 内核线程和普通的进程间的区别在于内核线程没有独立的地址空间,它只在 内核空间运行,从来不切换到用户空间去;并且和普通进程一样,可以被调度,也可以被抢占。让模块在加载后能一直运行下去的方法——内核线程。要创建一个内核线程有许多种方法。  第2章:linux常见的任务调度的优先级 2.1 workqueue(include/linux/workqueue.h)与waitequeue协作(include/linux/wait.h) 使用示例  定义workqueue  struct work_struct retreive_frame_work; 1 初始化workqueue和waitqueue及workqueue的callback的定义  INIT_WORK(&retreive_frame_work, retrieve_desc_task_callback); init_waitqueue_head(&frame_wq);  void retrieve_desc_task_callback(struct work_struct *work){    add_desc(iav, 0); //生产frame_desc    wake_up_interruptible_all(&frame_wq); //通知消费者,frame_desc已经生产完毕,可以去取了 }  中断ISR中:唤醒workqueue  queue_work(system_wq, &retreive_frame_work); 消费由 retrieve_desc_task_callback 生产的 frame_desc(blocking 的方式)  wait_event_interruptible(frame_wq, (desc = find_frame_desc()));  2.2 kthread(include/linux/kthread.h) 常见的优先级如下:SCHED_OTHER、SCHED_RR、SCHED_FIFO 本小节会主要介绍SCHED_RR和SCHED_FIFO这两种调度策略,因为SCHED_OTHER其实就是默认的调度机制,也就是说该咋咋滴,能被高优先级抢占,其实优先级就是最低的,属于金字塔食物链底端,谁都可以欺负它。谁优先级高就谁吃cpu cycle,同优先级就是linux来调度,轮着吃高优先级schedule 出来的cpu cycle。  SCHED_RR、SCHED_FIFO对比 对比才有伤害,是骡子是马都得见婆婆,丑媳妇也得拉出来溜溜。先上图再解析,直截了当。 说实话,我是真的懒,看到好图直接盗图了,开源精神有。如作者不允许到请联系我,我自己画一个哈。 参考链接: 一个披着机器人工程师外套的程序员. 图参考链接: cynchanpin. SCHED_RR的任务调度时间片如下图1  SCHED_FIFO任务调度时间片如下图2  剖析:这里内核创建了四个优先级相同的线程,对于SCHED_RR而言是每个任务有一个特定的时间片,轮转依次运行,而SCHED_FIFO是一个任务执行完再去执行下一个任务(优先级高),其执行顺序是创建的先后。SCHED_RR是依据时间片来调度线程的,当时间片用完后,无论该线程优先级多高,都不会再执行,而是进入就绪队列,等待下一个时间片到来。只是图1显示,在thread5798时间片用完时,该线程紧接着进行了一次抢占preemption。又获得了一个时间片。顺便提一句时间片长度的定位是linux凭经验来的。即选择尽可能长、同一时候能保持良好对应时间的一个时间片。  SCHED_FIFO:先进先出调度 SCHED_FIFO线程的优先级必须大于0,当它运行时,一定会抢占正在运行的普通策略的线程(SCHED_OTHER, SCHED_IDLE, SCHED_BATCH);SCHED_FIFO策略是没有时间片的算法,需要遵循以下规则: 1)如果一个SCHED_FIFO线程被高优先级线程抢占了,那么它将会被添加到该优先级等待列表的首部,以便当所有高优先级的线程阻塞的时候得到继续运行; 2)当一个阻塞的SCHED_FIFO线程变为可运行时,它将被加入到同优先级列表的尾部; 3)如果通过系统调用改变线程的优先级,则根据不同情况有不同的处理方式: a)如果优先级提高了,那么线程会被添加到所对应新优先级的尾部,因此,这个线程有可能会抢占当前运行的同优先级的线程; b)如果优先级没变,那么线程在列表中的位置不变; c)如果优先级降低了,那么它将被加入到新优先级列表的首部; 根据POSIX.1-2008规定,除了使用pthread_setschedprio(3)以外,通过使用其他方式改变策略或者优先级会使得线程加入到对应优先级列表的尾部; 4)如果线程调用了sched_yield(2),那么它将被加入到列表的尾部; SCHED_FIFO会一直运行,直到它被IO请求阻塞,或者被更高优先级的线程抢占,亦或者调用了sched_yield(); 5) 处于可运行状态的SCHED_FIFO级的进程会比任何SCHED_NORMAL级的进程都先得到调用 6) 一旦一个SCHED_FIFO级进程处于可执行状态,就会一直执行,直到它自己受阻塞或显式地释放处理器为止,它不基于时间片,可以一直执行下去 7) 只有更高优先级的SCHED_FIFO或者SCHED_RR任务才能抢占SCHED_FIFO任务 如果有两个或者更多的同优先级的SCHED_FIFO级进程,它们会轮流执行,但是依然只有在它们愿意让出处理器时才会退出 8) 只要有SCHED_FIFO级进程在执行,其他级别较低的进程就只能等待它变为不可运行态后才有机会执行  SCHED_RR:轮转调度 SCHED_RR是SCHED_FIFO的简单增强,除了对于线程占用的时间总量之外,对于SCHED_FIFO适用的规则对于SCHED_RR同样适用;如果SCHED_RR线程的运行时间大于等于时间总量,那么它将被加入到对应优先级列表的尾部;如果SCHED_RR线程被抢占了,当它继续运行时它只运行剩余的时间量;时间总量可以通过sched_rr_get_interval()函数获取; 当SCHED_RR任务耗尽它的时间片时,在同一优先级的其他实时进程被轮流调度 时间片只用来重新调度同一优先级的进程 对于SCHED_FIFO进程, 优先级总是立即抢占低优先级,但低优先级进程决不能抢占SCHED_RR任务,即使它的时间片耗尽 SCHED_OTHER:默认Linux时间共享调度 SCHED_OTHER只能用于优先级为0的线程,SCHED_OTHER策略是所有不需要实时调度线程的统一标准策略;调度器通过动态优先级来决定调用哪个SCHED_OTHER线程,动态优先级是基于nice值的,nice值随着等待运行但是未被调度执行的时间总量的增长而增加;这样的机制保证了所有SCHED_OTHER线程调度的公平性  MAX_RT_PRIO:实时优先级 1) 实时优先级范围从0到MAX_RT_PRIO减1 2) 默认情况下,MAC_RT_RTIO为100——所以默认的实时优先级范围从0到99 3) SCHED_NORMAL级进程的noce值共享了这个取值空间。它的取值范围从MAC_RT_PRIO到(MAX_RT_PRIO+40)。也就是说,在默认情况下,nice值从-20到+19直接对应的是从100到139的实时优先级范围 4) struct sched_param param = { .sched_priority = MAX_RT_PRIO },最后优先级priority是越小越好,最后的值会转换为priority = n - sched_priority,所以sched_priority 越大,则其实优先级越高。  限制实时线程的CPU使用时间 SCHED_FIFO, SCHED_RR的线程如果内部是一个非阻塞的死循环,那么它将一直占用CPU,使得其它线程没有机会运行; 在2.6.25以后出现了限制实时线程运行时间的新方式,可以使用RLIMIT_RTTIME来限制实时线程的CPU占用时间;Linux也提供了两个proc文件,用于控制为非实时线程运行预留CPU时间; /proc/sys/kernel/sched_rt_period_us 这个文件中的数值指定了总CPU(100%)时间的宽度值,默认值是1,000,000; /proc/sys/kernel/sched_rt_runtime_us 这个文件中的数值指定了实时线程可以运行的CPU时间宽度,如果设置为-1,则认为不给非实时线程预留任何运行时间,默认值是950,000,因为第一个文件的总量是1,000,000,也就是说默认配置为非实时线程预留了5%的CPU时间;  使用示例 现在有一个需求kernel中,共一个SOF(start of frame)中断唤醒thread A, thread A 去唤醒 thread B, thread A 和 B 执行完就分别进休眠状态,从中断到thread B 被唤醒必须在4ms内(即使在arm的loading很重的时候)(具体原因是码流的idsp/iso/shutter/again/dgain等参数必须要在一帧内下进去,而sensor信号时I2C发送的,如果fps = 120,一帧数据传输+I2C stop和start信号之间的间隙约8ms,所以要尽可能间断其时从SOF 中断进来,到更新参数thread被唤醒的时间,现预估4ms内,实际应该更小),thread A 去apply参数,thread B 去update参数,具体流程是:  一开始有一个准备好的参数-》 SOF中断trigger唤醒thread A 去apply 参数-》 thread A 去唤醒thread B 去更新参数: 同时有其它一个线程C一直更新参数,thread B 唤醒时,可以去拿参数。 初始化线程  inline void vin_create_kthreads(struct vin_device *vdev) {     struct sched_param sched_param = { .sched_priority = MAX_RT_PRIO / 2 };      if (!thread_B.kthread) {         sema_init(&sem_b, 0);         thread_B.kthread = kthread_run(update_sht_agc_task, vdev, update_sht_agc);         sched_setscheduler(thread_B.kthread, SCHED_FIFO, &sched_param);     }      if (!thread_A.kthread) {         sema_init(&sem_a, 0);         thread_A.kthread = kthread_run(apply_sht_agc, vdev, apply_sht_agc);         sched_setscheduler(thread_A.kthread, SCHED_FIFO, &sched_param);     }     return ; }  apply参数线程的结构体  struct apply_sht_agc {     struct task_struct *kthread;     struct semaphore sem;     u32 exit_kthread : 1;     u32 reserved0 : 31; };  SOF中断唤醒apply 参数线程  #include <linux/time.h> long ns_irq = 0; struct timespec64  tstart_irq; idsp_sof_irq {     atomic_set(&vinc->wait_sof, 0);     ktime_get_real_ts64(&tstart_irq);     ns_irq = tstart_irq.tv_nsec;     up(&vsem_a); }  apply参数线程,执行结束唤醒更新参数线程  #include <linux/time.h> extern struct timespec64  tstart_irq; extern long ns_irq; long ns_task = 0; u8 debug_times = 0; int apply_shutter_agc_task(void *arg) {     while (!kthread_should_stop()) {         if (thread_A.exit_kthread) {             continue;         }          ktime_get_real_ts64(&tstart_irq);         ns_task = tstart_irq.tv_nsec;         if (ns_task - ns_irq > 2000000 || debug_times < 2) {             iav_debug(ns_task = %ld, ns_irq = %ld\n, ns_task, ns_irq);             debug_times++;         }          if (down_interruptible(&sem_a)) {             continue;         }         //apply 参数         up(&sem_b);         wake_up_interruptible_all(&a->sht_agc_wq);         return 0; }  更新参数线程  int update_sht_agc_task(void *arg) {     while (!kthread_should_stop()) {         if (thread_B->exit_kthread) {             continue;         }          /* wait for wdr setting ioctl */         if (down_interruptible(&sem_b)) {             continue;         }        //更新参数         }     }     return 0; }  停止线程  inline void vin_dev_stop_kthreads(struct vin_device *vdev) {     if (thread_A.kthread) {         thread_A.exit_kthread = 1;         up(&sem_a);         kthread_stop(thread_A.kthread);         thread_A.kthread = 0;         thread_A.exit_kthread = 0;     }      if (thread_B.kthread) {         thread_B.exit_kthread = 1;         up(&sem_b);         kthread_stop(thread_B.kthread);         thread_B.kthread = 0;         thread_B.exit_kthread = 0;     }     return ; }  2.3 kthread work && kthread worker kthread worker 和 kthread work机制组合其实如名字所述,比较清晰,就是一个worker可以同时做好几个work(下图可以有助于大家理解,用链表维护work),其实kthread worker其实就是创建了一个线程,我理解其实类似单片机里面的单个线程,单片机也能根据timer等机制来实现多任务工作,实际上也是 “宏观上并行,微观上串行的“, 这样看来其实我们的work其实就是宏观并行,微观串行的任务了。 图参考链接: 小小城御园 .    结构体分析 (include/linux/kthread.h 文件中) kthread_worker 结构体  struct kthread_worker {     unsigned int flags;     spinlock_t lock;      //保护work_list链表的自旋锁     struct list_head work_list;    //kthread_work 链表,相当于流水线     struct list_head delayed_work_list;   // 延时工作 list     struct task_struct *task;      //为该kthread_worker执行任务的线程对应的task_struct结构     struct kthread_work *current_work;    //当前正在处理的kthread_work };  kthread_work 结构体  struct kthread_work {     struct list_head    node;  //kthread_work链表的链表元素     kthread_work_func_t    func;  //执行函数,该kthread_work所要做的事情     struct kthread_worker    *worker;  //处理该kthread_work的kthread_worker     /* Number of canceling calls that are running at the moment. */     int canceling; };  结构体定义 struct kthread_worker kworker; //声明一个kthread_worker struct kthread_work kwork0; //声明一个kthread_work struct task_struct *kworker_task;//声明一个kthread句柄  初始化 kthread_init_worker(&kworker); \\将该worker初始化 /* kthread_worker_fn 会执行一个循环调度看worker里面work链表是否有任务,有任务则调callback执行任务*/ kworker_task = kthread_run(kthread_worker_fn, &kworker, worker);  \\创建kworker线程并开始运行 kthread_init_work(&kwork0, kwork_callback); //初始化kthread_work,设置work执行函数kwork_callback sched_setscheduler(bsb_ctx->bsb_kworker_task, SCHED_FIFO, &param);  \\修改kworker线程的调度机制为FIFO  运行 kthread_queue_work(&kworker, &kwork0); //将work挂到worker上,然后该线程就会去调度执行这个work了 1 销毁阶段 /* 刷新指定 worker 上所有 work,将指定的 worker 上的所有 work 全部执行完,一般会在kthread_stop之前使用 */ kthread_flush_worker(&kworker); if (kworker_task) {     /* 停止当前线程一般和kthread_flush_worker一起使用,一般都会在队列的所有任务都完成之后再停止进程 */     kthread_stop(kworker_task);     kworker_task= NULL; }  kthread_worker_fn 函数剖析 int kthread_worker_fn(void *worker_ptr) {     struct kthread_worker *worker = worker_ptr;     struct kthread_work *work;     /*      * FIXME: Update the check and remove the assignment when all kthread      * worker users are created using kthread_create_worker*() functions.      */     WARN_ON(worker->task && worker->task != current);     worker->task = current;       if (worker->flags & KTW_FREEZABLE)         set_freezable();   repeat:     set_current_state(TASK_INTERRUPTIBLE);    /* mb paired w/ kthread_stop */       if (kthread_should_stop()) {         __set_current_state(TASK_RUNNING);         spin_lock_irq(&worker->lock);         worker->task = NULL;         spin_unlock_irq(&worker->lock);         return 0;     }       work = NULL;     spin_lock_irq(&worker->lock);     if (!list_empty(&worker->work_list)) {         work = list_first_entry(&worker->work_list,                     struct kthread_work, node);         list_del_init(&work->node);     }     worker->current_work = work;     spin_unlock_irq(&worker->lock);     if (work) {         __set_current_state(TASK_RUNNING);         work->func(work);     } else if (!freezing(current))         schedule();     try_to_freeze();     goto repeat; } EXPORT_SYMBOL_GPL(kthread_worker_fn);  代码逻辑如下: 1、首先是赋值了传入的 worker->task 为 current,当前进程,设置状态为 TASK_INTERRUPTIBLE; 2、Check 标志位,看是否需要关闭这个 kthread_worker_fn 内核线程,如果需要关闭,则进程状态,并清空 worker 下对应的 work 3、判断当前的 worker 的 work_list 上是否为空,如果非空,那么取出它,设置成为 worker->current_work,即当前的 work 4、执行取出的 work->func,这个最终就会掉用我们之前注册的回调函数xxx_work_fn 5、如果 worker 的 work_list 上为空,也就是没有任务,那么就会调用 schedule(),这个 kthread_worker_fn 执行线程进入睡眠 6、如果没有退出,则 goto repeat,继续执行  2.4 tasklet(include/linux/interrupt.h) 使用示例 中断:突然有一辆车撞到你了 task A:马上送去医院救治(优先级高) task B:去找司机理论(优先级低) 在中断中,task A,B两件事都应该做的对吗,但是事有轻重缓急,很明显A 比较重要,B 次之,但是也要做的。  定义tasklet的任务  struct tasklet_struct    A_tasklet; 初始化 tasklet 函数定义  void tasklet_init(struct tasklet_struct *t,              void (*func)(unsigned long), unsigned long data);  初始化tasklet,把task B绑定到tasklet中  tasklet_init(&dsp->vdsp_tasklet, task_B, (unsigned long)dsp);  中断函数(ISR)中  ISR {     //do task A     tasklet_schedule(&dsp->vcap_tasklet); //去调度task,执行vdsp_task ,而vdsp_task 中放task B }  做不紧急的事情  vdsp_task {    //do task B }  文章知识点与 ———————————————— 版权声明:本文为CSDN博主「雨中奔跑的大蒜苗」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/zxpblog/article/details/108539245