进程间通信是操作系统需要提供的一项重要功能,其中管道又是最为基础的进程间通信方式之一。管道的作用相当于提供了一个先进先出的队列,进程可以向管道中写入数据也可以从中读取数据。本文将介绍如何在uCore操作系统实验八的基础上实现匿名管道。

管道设备的源代码位于pipe文件夹下,测试程序源代码为pipe.c

管道简介

管道技术是进程间通信的一种方式,和文件不同,管道以及管道的变种不会占据任何的磁盘空间,而只是使用了内核中的一个缓冲区。如果缓冲区满了,那么进程会被阻塞。管道通常是单工的,通常只有一个发送端和一个接收端。当然,多个发送端或者多个接收端也是完全可行的。

管道技术最为常见的场景就是Shell脚本,例如脚本使用命令grep kernel | less将grep和less连接在一起,让grep的输出作为less的输入。一些程序内部也会使用管道作为进程间传输数据的的方式,例如在使用GCC编译程序的时候,使用-pipe标志可以使用管道来代替临时文件来保存中间结果,使用管道可以加速编译过程。

UNIX系统中创建管道的方式有两种:pipe()和mkfifo()。前者创建了一个匿名的管道,后者创建了一个命名的管道,两者的唯一区别就是命名管道会一直留在文件系统中直到被删除,而匿名管道不存在于文件系统中,当文件描述符被关闭后将消失。

调用pipe()创建一个匿名管道后,调用后的到两个文件描述符,其中一个用于写入,另外一个用于读取。管道通常有一个环形缓冲区和两个以上的信号量实现,环形缓冲区的大小通常是4KB,也就是一页内存大小。

只要管道处于被打开的状态,那么管道就会一直存在,当所有的发送端和所有的接收端关闭之后,管道就会被操作系统回收销毁。管道通常用于两个进程之间的通信,父进程通过fork()的方式将管道的文件描述符传递给子进程。例如,当用户在Shell中使用管道的时候,Shell解释器首先调用pipe()建立一个管道,然后调用fork()来创建一个子进程来执行程序。在使用exec()来加载应用程序之前,需要使用dup()来进行重定向。

准备工作

管道的实现基于虚拟文件系统,因此在实现管道之前,需要完成实验八中完善文件系统功能的练习。由于这部分不是文本的重点,也就不会介绍。

管道设计

管道数据结构

首先需要设计管道的数据结构,管道本质上就是一个操作系统内核管理的环形缓冲区,所以需要一块内存作为缓冲区,然后需要记录环形缓冲区的头部和尾部。当一个进程尝试从空管道读取数据或者向满管道写入数据的时候,操作系统内核需要将进程阻塞,所以还需要一个读取等待队列和一个写入等待队列。

struct pipe {
    size_t p_head;			// 缓冲区头部
    size_t p_tail;			// 缓冲区尾部
    wait_queue_t p_read_queue;		// 管道读取等待队列
    wait_queue_t p_write_queue;		// 管道写入等待队列
    char * p_buffer;			// 环形缓冲区
};
空闲
空闲
数据
数据
数据
数据
数据
数据
空闲
空闲
空闲
空闲
空闲
空闲
空闲
空闲
尾部
尾部
头部
头部
顺时针方向增长
顺时针方向增长

缓冲区大小通常设为4KB,刚好是一页内存的大小:

#define PIPE_BUFFER_SIZE 4096

在环形缓冲中,头部指针指向下次需要取出的数据,尾部指针指向下次放入的区域。按照图 1的设计,当头部指针与尾部指针相等的时候,缓冲区空;当头部指针恰好在尾部指针前一个位置的时候,缓冲区为满。所以判断缓冲区状态的代码如下:

#define pipe_empty(pip) (pip->p_head == pip->p_tail)
#define pipe_full(pip) (pip->p_head == (pip->p_tail+1)%PIPE_BUFFER_SIZE)

信息节点

接下来需要新增一种管道类型的信息节点(inode),需要在信息节点的定义中添加管道信息数据结构和管道类型号。

struct inode {
    union {
        struct pipe __pipe_info;		// 新增:管道信息
        struct device __device_info;
        struct sfs_inode __sfs_inode_info;
    } in_info;
    enum {
        inode_type_pipe_info,			// 新增:管道类型
        inode_type_device_info = 0x1234,
        inode_type_sfs_inode_info,
    } in_type;
    int ref_count;
    int open_count;
    struct fs *in_fs;
    const struct inode_ops *in_ops;
};

管道操作

除了管道的创建过程需要使用独立的系统调用外,管道的其他操作都是由虚拟文件系统控制的,这也就意味着需要提供一组操作接口供虚拟文件系统使用。

static const struct inode_ops pipe_node_ops = {
    .vop_magic                      = VOP_MAGIC,
    .vop_close                      = pipe_close,		// 关闭管道
    .vop_read                       = pipe_read,		// 读取管道
    .vop_write                      = pipe_write,		// 写入管道
    .vop_reclaim                    = pipe_reclaim	// 管道回收
};

管道实现

管道创建

操作系统内核需要以系统调用的形式提供一个创建管道的接口。

pipe
[Not supported by viewer]
sys_pipe
[Not supported by viewer]
用户态
用户态
内核态
内核态
sys_pipe
sys_pipe
系统调用
系统调用
sysfile_pipe
sysfile_pipe
file_pipe
file_pipe

首先需要创建管道的信息节点:

struct inode * pipe_create_inode() {
    char *buffer;
    // 分配缓冲区内存
    if ((buffer = kmalloc(PIPE_BUFFER_SIZE)) == NULL) {
        return NULL;
    }
    struct inode *node;
    // 分配信息节点
    if ((node = alloc_inode(pipe)) == NULL) {
        kfree(buffer);
        return NULL;
    }
    // 初始化信息节点
    vop_init(node, &pipe_node_ops, NULL);
    struct pipe *pip = vop_info(node, pipe);
    pip->p_buffer = buffer;
    pip->p_head = 0;
    pip->p_tail = 0;
    wait_queue_init(&(pip->p_read_queue));		// 初始化读队列
    wait_queue_init(&(pip->p_write_queue));		// 初始化写队列
    return node;
}

进程需要得到管道的文件描述符用于读写,所以管道创建接口在创建好信息节点之后,还需要创建两个文件描述符,一个文件描述符只读,另外一个文件描述符只写。

int file_pipe(int *fd_store) {
    int ret = 0;
    // 分配文件描述符结构
    struct file *file_in, *file_out;
    if ((ret = fd_array_alloc(NO_FD, &file_in)) != 0) {
        return ret;
    }
    if ((ret = fd_array_alloc(NO_FD, &file_out)) != 0) {
        fd_array_free(file_in);
        return ret;
    }
    // 创建信息节点
    struct inode *node;
    if ((node = pipe_create_inode()) == NULL) {
        fd_array_free(file_in);
        fd_array_free(file_out);
        return -E_NO_MEM;
    }
    // 设置信息节点计数值
    vop_ref_inc(node);
    vop_open_inc(node);
    vop_open_inc(node);
    // 初始化文件描述符
    file_in->pos = 0;
    file_in->node = node;
    file_in->readable = 1;
    file_in->writable = 0;
    fd_array_open(file_in); 	// 将文件描述符设置为打开状态
    file_out->pos = 0;
    file_out->node = node;
    file_out->readable = 0;
    file_out->writable = 1;
    fd_array_open(file_out);	// 将文件描述符设置为打开状态
    // 返回未见描述符编号
    fd_store[0] = file_in->fd;
    fd_store[1] = file_out->fd;
    return ret;
}

在设置信息节点计数值的时候,代码看起来非常奇怪,vop_ref_inc调用了一次,但是vop_open_inc调用了两次。这是因为在初始化信息节点的时候,信息节点的引用计数值默认为1,打开计数值默认为0,由于管道只被文件描述符引用,不存在于实际的文件系统中,也就是打开计数值永远等于引用计数值,因此初始化设置的默认值显得多余了,

管道关闭

当管道信息节点的打开计数变为0后,管道关闭操作就会被虚拟文件系统调用,对于管道来说,引用计数值等于打开计数值,当管道相关的文件描述符全部关闭后,由回收操作进行内存资源回收,关闭操作不需要做任何工作。

static int pipe_close(struct inode *node) {
    return 0;
}

管道回收

当管道的全部文件描述符关闭后,虚拟文件系统调用管道回收操作回收缓冲区内存以及信息节点使用的内存。

static int pipe_reclaim(struct inode *node) {
    struct pipe *pip = vop_info(node, pipe);
    kfree(pip->p_buffer);
    vop_kill(node);
    return 0;
}

管道读取

read
[Not supported by viewer]
sys_read
[Not supported by viewer]
用户态
用户态
内核态
内核态
sys_read
sys_read
系统调用
系统调用
sysfile_read
sysfile_read
file_read
file_read
vop_read
<span>vop_read</span>
pipe_read
<span>pipe_read</span>

读取过程采用逐字节读取的方式读取数据,对于每一个字节的数据:

  • 如果缓冲区非空,那么取出一个字节后,唤醒处于写入等待队列中的进程,然后尝试下一个字节;
  • 如果缓冲区为空, 那么进程将自己加入读取等待队列,主动进入阻塞状态,等待进程被唤醒后再次尝试;
  • 如果全部要求的数据全部读取完毕,那么结束读取过程,返回读取到的字节数。
static int pipe_read(struct inode *node, struct iobuf *iob) {
    struct pipe *pip = vop_info(node, pipe);
    int ret = 0;
    bool intr_flag;
    local_intr_save(intr_flag);
    {
        for (; ret < iob->io_resid; ret ++, pip->p_head = (pip->p_head+1)%PIPE_BUFFER_SIZE) {
        try_again:
            if (!pipe_empty(pip)) {
                *(char *)((iob->io_base) ++) = pip->p_buffer[pip->p_head];
                if (!wait_queue_empty(&(pip->p_write_queue))) {
                    wakeup_queue(&(pip->p_write_queue), WT_IPC, 1);
                }
            }
            else {
                wait_t __wait, *wait = &__wait;
                wait_current_set(&(pip->p_read_queue), wait, WT_IPC);
                local_intr_restore(intr_flag);

                schedule();

                local_intr_save(intr_flag);
                wait_current_del(&(pip->p_read_queue), wait);
                if (wait->wakeup_flags == WT_IPC) {
                    goto try_again;
                }
                break;
            }
        }
    }
    local_intr_restore(intr_flag);
    iob->io_resid -= ret;
    return ret;
}

管道写入

write
[Not supported by viewer]
sys_write
[Not supported by viewer]
用户态
用户态
内核态
内核态
sys_write
sys_write
系统调用
系统调用
sysfile_write
sysfile_write
file_write
file_write
vop_write
[Not supported by viewer]
pipe_write
<span>pipe_write</span>

写入过程和读取过程非常类似,采用逐字节读取的方式写入数据,对于每一个字节的数据:

  • 如果缓冲区非满,那么写入一个字节后,唤醒处于读取等待队列中的进程,然后尝试下一个字节;
  • 如果缓冲区为满, 那么进程将自己加入写入等待队列,主动进入阻塞状态,等待进程被唤醒后再次尝试;
  • 如果全部要求的数据全部写入完毕,那么结束读取过程,返回写入的字节数。
static int pipe_write(struct inode *node, struct iobuf *iob) {
    struct pipe *pip = vop_info(node, pipe);
    int ret = 0;
    bool intr_flag;
    local_intr_save(intr_flag);
    {
        for (; ret < iob->io_resid; ret ++, pip->p_tail = (pip->p_tail+1)%PIPE_BUFFER_SIZE) {
        try_again:
            if (!pipe_full(pip)) {
                pip->p_buffer[pip->p_tail] = *(char *)((iob->io_base) ++);
                if (!wait_queue_empty(&(pip->p_read_queue))) {
                    wakeup_queue(&(pip->p_read_queue), WT_IPC, 1);
                }
            }
            else {
                wait_t __wait, *wait = &__wait;
                wait_current_set(&(pip->p_write_queue), wait, WT_IPC);
                local_intr_restore(intr_flag);

                schedule();

                local_intr_save(intr_flag);
                wait_current_del(&(pip->p_write_queue), wait);
                if (wait->wakeup_flags == WT_IPC) {
                    goto try_again;
                }
                break;
            }
        }
    }
    local_intr_restore(intr_flag);
    iob->io_resid -= ret;
    return ret;
}

管道测试

将uCore中Shell程序创建管道的代码取消注释开启管道功能。

img

为了测试管道功能,测试方案如下:

  • 使用回显程序显示程序的输出;
  • 使用管道测试程序测试管道读写。

测试结果如下

img

参考文献

  1. OSDev.org, Unix Pipes, http://wiki.osdev.org/Unix_Pipes