@@ -825,13 +825,14 @@ RISC-V 并不原生支持 CAS/TAS 原子指令,但我们可以通过 LR/SC 指
825
825
826
826
下面介绍我们的操作系统如何实现阻塞机制以及阻塞机制的若干应用。
827
827
828
- 实现阻塞机制
828
+ 实现阻塞与唤醒机制
829
829
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
830
830
831
- 在 ``suspend/exit_current_and_run_next `` 之外,我们新增第三种任务管理接口 ``block_current_and_run_next `` :
831
+ 在任务管理方面,此前我们已经有 ``suspend/exit_current_and_run_next `` 两种接口。现在我们新增第三种接口 ``block_current_and_run_next `` :
832
832
833
833
.. code-block :: rust
834
834
:linenos:
835
+ :emphasize-lines: 18
835
836
836
837
// os/src/task/mod.rs
837
838
@@ -844,12 +845,249 @@ RISC-V 并不原生支持 CAS/TAS 原子指令,但我们可以通过 LR/SC 指
844
845
schedule(task_cx_ptr);
845
846
}
846
847
848
+ pub fn suspend_current_and_run_next() {
849
+ let task = take_current_task().unwrap();
850
+ let mut task_inner = task.inner_exclusive_access();
851
+ let task_cx_ptr = &mut task_inner.task_cx as *mut TaskContext;
852
+ task_inner.task_status = TaskStatus::Ready;
853
+ drop(task_inner);
854
+ add_task(task);
855
+ schedule(task_cx_ptr);
856
+ }
857
+
858
+ 当一个线程陷入内核态之后,如果内核发现这个线程需要等待某个暂未到来的事件或暂未满足的条件,就需要调用这个函数阻塞这个线程。从实现来看,第 4 行我们将线程从当前 CPU 的处理器管理结构中移除;第 5~8 行我们将线程状态修改为阻塞状态 Blocked ;最后在第 9 行我们保存当前的任务上下文到线程控制块中并触发调度切换到其他线程。
859
+
860
+ 作为比较,上面还给出了我们非常熟悉的会在时钟中断时被调用的 ``suspend_current_and_run_next `` 函数。我们的 block 版本和它的区别仅仅在于我们会将线程状态修改为 Blocked 以及我们此处 **不会将被阻塞的线程重新加回到就绪队列中 ** 。这样才能保证被阻塞的线程在事件到来之前不会被调度到。
861
+
862
+ 上面就是阻塞机制的实现,那么唤醒机制如何实现呢?当一个事件到来或是条件被满足的时候,首先我们要找到有哪些线程在等待这个事件或条件,这样才能够唤醒它们。因此,在内核中我们会 **将被阻塞的线程的控制块按照它们等待的具体事件或条件分类存储 ** 。通常情况,对于每种事件,我们将所有等待该事件的线程而被阻塞的线程保存在这个事件的阻塞队列(或称等待队列)中。这样我们在事件到来的时候就知道要唤醒哪些线程了。
863
+
864
+ 在事件到来的时候,我们要从事件的等待队列中取出线程,并调用唤醒它们的函数 ``wakeup_task `` :
865
+
866
+ .. code-block :: rust
867
+ :linenos:
868
+
869
+ // os/src/task/manager.rs
870
+
871
+ pub fn wakeup_task(task: Arc<TaskControlBlock>) {
872
+ let mut task_inner = task.inner_exclusive_access();
873
+ task_inner.task_status = TaskStatus::Ready;
874
+ drop(task_inner);
875
+ add_task(task);
876
+ }
877
+
878
+ 这里只是简单的将线程状态修改为就绪状态 Ready 并将线程加回到就绪队列。
879
+
880
+ 在引入阻塞机制后,还需要注意它跟线程机制的结合。我们知道,当进程的主线程退出之后,进程的所有其他线程都会被强制退出。此时,这些线程不光有可能处于就绪队列中,还有可能正被阻塞等待某些事件,因而在这些事件的阻塞队列中。所以,在线程退出时,我们需要检查线程所有可能出现的位置并将线程控制块移除,不然就会造成内存泄漏。有兴趣的同学可以参考 ``os/src/task/mod.rs `` 中的 ``remove_inactive_task `` 的实现。
881
+
882
+ 这样,我们就成功实现了阻塞-唤醒机制。
883
+
847
884
基于阻塞机制实现 sleep 系统调用
848
885
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
849
886
887
+ 现在我们尝试基于阻塞机制将 sleep 功能作为一个系统调用而不是在用户库 ``user_lib `` 中通过 yield 来实现。首先来看该系统调用的接口定义:
888
+
889
+ .. code-block :: rust
890
+
891
+ /// 功能:当前线程睡眠一段时间。
892
+ /// 参数: sleep_ms 表示线程睡眠的时间,单位为毫秒。
893
+ /// 返回值: 0
894
+ /// syscall ID : 101
895
+ pub fn sys_sleep(sleep_ms: usize) -> isize;
896
+
897
+ 当线程使用这个系统调用之后,它将在陷入内核态之后被阻塞。线程等待的事件则是 :ref: `时钟计数器 <link-time-counter >` 的值超过当前时间再加上线程睡眠的时长的总和,也就是超时之后就可以唤醒线程了。因此,我们要等待的事件可以用一个超时时间表示。然后,我们用一个数据结构 ``TimerCondVar `` 将这个超时时间以及等待它的线程放到一起,这是唤醒机制的关键数据结构:
898
+
899
+ .. code-block :: rust
900
+ :linenos:
901
+
902
+ pub struct TimerCondVar {
903
+ pub expire_ms: usize,
904
+ pub task: Arc<TaskControlBlock>,
905
+ }
906
+
907
+ 那么如何保证在超时的时候内核能够接收到这个事件并做相应处理呢?我们这里选择一种比较简单的做法:即在每次时钟中断的时候检查在上个时间片中是否有一些线程的睡眠超时了,如果有的话我们就唤醒它们。这样的做法可能使得线程实际睡眠的时间不太精确,但是其误差也不会超过一个时间片,还算可以接受。同学有兴趣的话可以想想看有没有什么更好的做法。
908
+
909
+ 当时钟中断的时候我们可以扫描所有的 ``TimerCondVar `` ,将其中已经超时的移除并唤醒相应的线程。不过这里我们可以用学过的数据结构知识做一点小优化:可以以超时时间为键值将所有的 ``TimerCondVar `` 组织成一个小根堆(另一种叫法是优先级队列),这样每次时钟中断的时候只需不断弹出堆顶直到堆顶还没有超时。
910
+
911
+ 为了能将 ``TimerCondVar `` 放入堆中,需要为其定义偏序、全序比较方法和相等运算,也就是要实现 ``PartialEq, Eq, PartialOrd, Ord `` 这些 Trait :
912
+
913
+ .. code-block :: rust
914
+ :linenos:
915
+
916
+ // os/src/timer.rs
917
+
918
+ impl PartialEq for TimerCondVar {
919
+ fn eq(&self, other: &Self) -> bool {
920
+ self.expire_ms == other.expire_ms
921
+ }
922
+ }
923
+ impl Eq for TimerCondVar {}
924
+ impl PartialOrd for TimerCondVar {
925
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
926
+ let a = -(self.expire_ms as isize);
927
+ let b = -(other.expire_ms as isize);
928
+ Some(a.cmp(&b))
929
+ }
930
+ }
931
+ impl Ord for TimerCondVar {
932
+ fn cmp(&self, other: &Self) -> Ordering {
933
+ self.partial_cmp(other).unwrap()
934
+ }
935
+ }
936
+
937
+ 由于标准库提供的二叉堆 ``BinaryHeap `` 是一个大根堆,因此在实现 ``PartialOrd `` Trait 的时候我们需要将超时时间取反。这样的话,就可以把所有的 ``TimerCondVar `` 放到一个全局二叉堆 ``TIMERS `` 中:
938
+
939
+ .. code-block :: rust
940
+
941
+ // os/src/timer.rs
942
+
943
+ use alloc::collections::BinaryHeap;
944
+
945
+ lazy_static! {
946
+ static ref TIMERS: UPSafeCell<BinaryHeap<TimerCondVar>> =
947
+ unsafe { UPSafeCell::new(BinaryHeap::<TimerCondVar>::new()) };
948
+ }
949
+
950
+ 设计好数据结构之后,首先来看 sleep 系统调用如何实现:
951
+
952
+ .. code-block :: rust
953
+
954
+ // os/src/syscall/sync.rs
955
+
956
+ pub fn sys_sleep(ms: usize) -> isize {
957
+ let expire_ms = get_time_ms() + ms;
958
+ let task = current_task().unwrap();
959
+ add_timer(expire_ms, task);
960
+ block_current_and_run_next();
961
+ 0
962
+ }
963
+
964
+ // os/src/timer.rs
965
+
966
+ pub fn add_timer(expire_ms: usize, task: Arc<TaskControlBlock>) {
967
+ let mut timers = TIMERS.exclusive_access();
968
+ timers.push(TimerCondVar { expire_ms, task });
969
+ }
970
+
971
+ 在 ``sys_sleep `` 中,我们首先计算当前线程睡眠超时时间 ``expire_ms `` ,然后调用 ``adder_timer `` 生成一个 ``TimerCondVar `` 并将其加入到全局堆 ``TIMERS `` 中。注意这个过程中线程控制块是如何流动的:它被复制了一份并移动到 ``TimerCondVar `` 中,此时在处理器管理结构 ``PROCESSOR `` 中还有一份。而在调用 ``block_current_and_run_next `` 阻塞当前线程之后, ``PROCESSOR `` 中的那一份就被移除了。此后直到线程被唤醒之前,线程控制块都只存在于 ``TimerCondVar `` 中。
972
+
973
+ 在时钟中断的时候则会调用 ``check_timer `` 尝试唤醒睡眠超时的线程:
974
+
975
+ .. code-block :: rust
976
+ :linenos:
977
+
978
+ // os/src/timer.rs
979
+
980
+ pub fn check_timer() {
981
+ let current_ms = get_time_ms();
982
+ let mut timers = TIMERS.exclusive_access();
983
+ while let Some(timer) = timers.peek() {
984
+ if timer.expire_ms <= current_ms {
985
+ // 调用 wakeup_task 唤醒超时线程
986
+ wakeup_task(Arc::clone(&timer.task));
987
+ timers.pop();
988
+ } else {
989
+ break;
990
+ }
991
+ }
992
+ }
993
+
994
+ 当线程睡眠的时候退出, ``timer.rs `` 中的 ``remove_timer `` 可以移除掉线程所在的 ``TimerCondVar `` ,在此不再赘述。
995
+
850
996
基于阻塞机制实现锁机制
851
997
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
852
998
999
+ 为了在操作系统的支持下实现锁机制,我们可以将锁看成进程内的一种资源(类似文件描述符表和地址空间),一个进程可以有多把锁,这些锁可以用它们的 ID 来区分,每把锁可以用来保护不同的共享资源。进程内的所有线程均可以访问锁,但是只能通过系统调用这种间接的方式进行访问。因此,需要新增若干锁机制相关的系统调用:
1000
+
1001
+ .. code-block :: rust
1002
+ :linenos:
1003
+
1004
+ /// 功能:为当前进程新增一把互斥锁。
1005
+ /// 参数: blocking 为 true 表示互斥锁基于阻塞机制实现,
1006
+ /// 否则表示互斥锁基于类似 yield 的方法实现。
1007
+ /// 返回值:假设该操作必定成功,返回创建的锁的 ID 。
1008
+ /// syscall ID: 1010
1009
+ pub fn sys_mutex_create(blocking: bool) -> isize;
1010
+
1011
+ /// 功能:当前线程尝试获取所属进程的一把互斥锁。
1012
+ /// 参数: mutex_id 表示要获取的锁的 ID 。
1013
+ /// 返回值: 0
1014
+ /// syscall ID: 1011
1015
+ pub fn sys_mutex_lock(mutex_id: usize) -> isize;
1016
+
1017
+ /// 功能:当前线程释放所属进程的一把互斥锁。
1018
+ /// 参数: mutex_id 表示要释放的锁的 ID 。
1019
+ /// 返回值: 0
1020
+ /// syscall ID: 1012
1021
+ pub fn sys_mutex_unlock(mutex_id: usize) -> isize;
1022
+
1023
+ 下面来看我们如何基于这些系统调用实现多线程计数器:
1024
+
1025
+ .. code-block :: rust
1026
+ :linenos:
1027
+
1028
+ // user/src/bin/adder_mutex_blocking.rs
1029
+
1030
+ unsafe fn f() -> ! {
1031
+ let mut t = 2usize;
1032
+ for _ in 0..PER_THREAD {
1033
+ mutex_lock(0);
1034
+ critical_section(&mut t);
1035
+ mutex_unlock(0);
1036
+ }
1037
+ exit(t as i32)
1038
+ }
1039
+
1040
+ #[no_mangle]
1041
+ pub fn main(argc: usize, argv: &[&str]) -> i32 {
1042
+ ...
1043
+ assert_eq!(mutex_blocking_create(), 0);
1044
+ let mut v = Vec::new();
1045
+ for _ in 0..thread_count {
1046
+ v.push(thread_create(f as usize, 0) as usize);
1047
+ }
1048
+ ...
1049
+ }
1050
+
1051
+ 在第 16 行我们调用用户库提供的 ``mutex_blocking_create `` 在进程内创建一把基于阻塞实现的互斥锁,由于这是进程内创建的第一把锁,其 ID 一定为 0 。接下来,每个线程在执行的函数 ``f `` 中,在进入临界区前后分别获取(第 6 行调用 ``mutex_lock `` )和释放(第 8 行调用 ``mutex_unlock `` )在主线程中创建的 ID 为 0 的锁。这里用到的三个函数都是由用户库直接封装相关系统调用得到的。
1052
+
1053
+ 那么在内核态这些系统调用是如何实现的呢?在我们的设计中,允许同个进程内的多把锁有着不同的底层实现,这样更加灵活。因此,我们用 ``Mutex `` Trait 来规定描述一把互斥锁应该有哪些功能:
1054
+
1055
+ .. code-block :: rust
1056
+
1057
+ // os/src/sync/mutex.rs
1058
+
1059
+ pub trait Mutex: Sync + Send {
1060
+ fn lock(&self);
1061
+ fn unlock(&self);
1062
+ }
1063
+
1064
+ 其实只包含 ``lock `` 和 ``unlock `` 两个方法。然后,我们可以在进程控制块中新增互斥锁这类资源:
1065
+
1066
+ .. code-block :: rust
1067
+ :linenos:
1068
+ :emphasize-lines: 6
1069
+
1070
+ // os/src/task/process.rs
1071
+
1072
+ pub struct ProcessControlBlockInner {
1073
+ ...
1074
+ pub fd_table: Vec<Option<Arc<dyn File + Send + Sync>>>,
1075
+ pub mutex_list: Vec<Option<Arc<dyn Mutex>>>,
1076
+ ...
1077
+ }
1078
+
1079
+ 和文件描述符表 ``fd_table `` 一样, ``mutex_list `` 使用 ``Vec<Option<T>> `` 构建一个含有多个可空槽位且槽位数可以拓展的互斥锁表,表中的每个元素都实现了 ``Mutex `` Trait ,是一种互斥锁实现。
1080
+
1081
+ .. ``sys_mutex_create`` 的实现方法比较简单...
1082
+
1083
+ .. 目前,我们仅提供了两种互斥锁实现:基于阻塞机制的 ``MutexBlocking`` 和基于类似 yield 机制的 ``MutexSpin`` ,其中我们会重点介绍前者。但在此之前,首先我们来看创建互斥锁的 ``sys_mutex_create`` 是如何实现的:
1084
+
1085
+ 小结
1086
+ ----------------------------------------------------------------------
1087
+
1088
+
1089
+
1090
+
853
1091
参考文献
854
1092
----------------------------------------------------------------------
855
1093
0 commit comments