【Linux】进程间通信之管道--命名管道&匿名管道通信&进程池设计

2023-12-15 22:13:47

一、进程间通信介绍

1.什么是通信

数据传输:一个进程需要将它的数据发送给另一个进程

资源共享:多个进程之间共享同样的资源。

通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。

进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变

2.为什么要有通信以及如何进行通信

我们知道,有时候我们是需要多进程协同的完成某些业务,比如cat file | grep “hello”,此时不同的进程之间就需要进行通信。但是进程具有独立性,今天我们需要通信,那么通信的成本一定不低。首先我们需要让不同的进程看到同一份资源,然后才能进行通信。

我们该如何理解通信的本质问题呢:OS需要直接或间接的给通信双方的进程提供"内存空间",要进行通信的进程,必须看到一份公共的资源,对于不同的通信种类,本质就是前面所说的资源,是OS中哪一个模块提供的

进程间通信的方式主要有一下三类:

1.管道

匿名管道pipe

命名管道

2.System V IPC

System V 消息队列

System V 共享内存

System V 信号量

3.POSIX IPC

消息队列

共享内存

信号量

互斥量

条件变量

读写锁

二、管道

1.什么是管道

管道是Unix中最古老的进程间通信的形式。

我们把从一个进程连接到另一个进程的一个数据流称为一个“管道"

在这里插入图片描述

2.匿名管道

2.1什么是匿名管道

我们知道,父进程打开的文件描述符表会拷贝给子进程一份,文件描述符表中的地址指向相同的struct_file。此时父进程和子进程就看到了同一个文件,一个struct_file中有file的操作方法,有属于自己的内核缓冲区-struct_page{}。这样通过父进程fork创建子进程的方式,让两个进程看到同一份内存资源,这份资源就称为匿名管道。

在这里插入图片描述

父进程分别以读和写的方式打开同一个文件,然后父子进程分别关闭读端和写段,这样就可以实现父进程向管道中写入数据,子进程进行读取,反之子进程向管道中写入数据,父进程中管道中进行读取。一般而言,管道只能用来单向数据通信。而匿名管道目前能用来父子进程之间进行进程间通信。

在这里插入图片描述

2.2接口认识–pipe

int pipe(int pipefd[2]);
函数功能:创建一个管道
头文件:#include <unistd.h>
参数:pipefd[2],输出型参数,用于返回两个文件描述符,pipefd[0]为读端,pipefd[1]为写端
返回值:成功返回0,失败返回-1,错误码被设置

2.3进程间通信之匿名管道实现

#include <iostream>
#include <cassert>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

using namespace std;

// 父进程进行读取,子进程进行写入
int main()
{
    // 第一步:创建管道文件,打开读写端
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);

    // 第二步: fork
    pid_t id = fork();
    assert(id >= 0);
    if (id == 0)
    {
        // 子进程进行写入
        close(fds[0]);
        // 子进程的通信代码
        const char *message = "我是子进程,我正在给你发消息";
        int cnt = 0;
        while (true)
        {
            cnt++;
            char buffer[1024];
            snprintf(buffer, sizeof(buffer), "chile->parent say:%s[%d][%d]", message, cnt, getpid());
            // 写端写满的时候,在写会阻塞,等对方进行读取!
            write(fds[1], buffer, strlen(buffer));
            sleep(1);
        }
        
        close(fds[1]);
        cout << "子进程关闭自己的写端" << endl;
        exit(0);
    }

    // 父进程进行读取
    close(fds[1]);
    // 父进程的通信代码
    while (true)
    {
        char buffer[1024];
        // cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl;
        // 如果管道中没有了数据,读端在读,默认会直接阻塞当前正在读取的进程!
        ssize_t s = read(fds[0], buffer, sizeof(buffer) - 1);
        // cout << "BBBBBBBBBBBBBBBBBBBBBB" << endl;
        if (s > 0)
        {
            buffer[s] = 0;
            cout << "Get Message# " << buffer << " | my pid: " << getpid() << endl;
        }
        else if (s == 0)
        {
            cout << "read #" << s << endl;
            break;
        }
    }
    close(fds[0]);

    int status = 0;
    n = waitpid(id, &status, 0);

    assert(n == id);

    cout << "pid->" << n << " : " << (status & 0x7F) << endl;
    return 0;
}

2.4读写特征

1.读慢,写快

我们将父进程每读一次,sleep 1秒

在这里插入图片描述

每次读到多次写入的数据

2.读快,写慢

我们让子进程每写一次,就sleep 1秒

在这里插入图片描述

由于每隔一秒才进行写入一次,那么父进程读取一次之后,等子进程下一次写入之后在 再进行读取,期间会阻塞在read处

3.写关闭,读到0

我们让子进程写入一次之后,睡眠5后,关闭写端

在这里插入图片描述

5秒之后,写端关闭,读到0-文件结尾

4.读关闭,OS会给进程发生信号终止写端

我们让父进程读取一次之后,关闭读端

在这里插入图片描述

此时,子进程接收到了13号信号

在这里插入图片描述

总结:

当没有数据可读时

O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。

O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。

当管道满的时候

O_NONBLOCK disable: write调用阻塞,直到有进程读走数据

O_NONBLOCK enable:调用返回-1,errno值为EAGAIN

如果所有管道写端对应的文件描述符被关闭,则read返回0

如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出

当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。

当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。

2.5管道的特征

只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。管道提供流式服务

一般而言,进程退出,管道释放,所以管道的生命周期随进程

一般而言,内核会对管道操作进行同步与互斥

管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道

在这里插入图片描述

2.6 进程池设计

我们这里实现一个进行创建出多个子进程,然后通过父进程向子进程发送任务,让子进程去执行对应的任务。

代码实现:

#include <iostream>
#include <vector>
#include <functional>
#include <cassert>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

#define MakeSeed() srand((unsigned long)time(nullptr) ^ getpid() ^ 0x171237 ^ rand() % 1234)

const int processnum = 5;
typedef std::function<void()> func_t;

void downloadTask()
{
    std::cout << getpid() << " 正在执行下载任务" << std::endl;
    sleep(1);
}

void IOTask()
{
    std::cout << getpid() << " 正在执行IO任务" << std::endl;
    sleep(1);
}

void fflushTask()
{
    std::cout << getpid() << " 正在执行刷新任务" << std::endl;
    sleep(1);
}

void loadTaskFunc(std::vector<func_t> &funcmap)
{
    funcmap.push_back(downloadTask);
    funcmap.push_back(IOTask);
    funcmap.push_back(fflushTask);
}

class subEp
{
public:
    subEp(const pid_t &subId, const int writeFd)
        : _subId(subId), _writeFd(writeFd)
    {
        char namebuffer[1024];
        snprintf(namebuffer, sizeof namebuffer, "process-%d[pid(%d) - fd(%d)]", _num++, _subId, _writeFd);
        _name = namebuffer;
    }

public:
    static int _num;
    std::string _name;
    pid_t _subId;
    int _writeFd;
};
int subEp::_num = 0;

int recvTask(int fd)
{
    int code = 0;
    ssize_t s = read(fd, &code, sizeof(code));
    if (s == 4)
        return code;
    else if (s <= 0)
        return -1;
    else
        return 0;
}

void sendTask(const subEp &process, int taskNum)
{
    std::cout << "send task num:" << taskNum << "send to ->" << process._name << std::endl;
    int n = write(process._writeFd, &taskNum, sizeof(taskNum));
    assert(n == 4);
    (void)n;
}

void createSubProcess(std::vector<subEp> &subs, std::vector<func_t> &funcmap)
{
    std::vector<int> deleteFd;
    for (int i = 0; i < processnum; i++)
    {
        int fds[2];
        int n = pipe(fds);
        assert(n == 0);
        (void)n;
        // 父进程打开的文件,是会被子进程共享的

        pid_t id = fork();
        if (id == 0)
        {
            close(fds[1]);
            while (true)
            {
                for (int i = 0; i < deleteFd.size(); i++)
                    close(deleteFd[i]);
                // 1. 获取命令码,如果没有发送,我们子进程应该阻塞
                int commandCode = recvTask(fds[0]);
                // 2. 完成任务
                if (commandCode >= 0 && commandCode < funcmap.size())
                    funcmap[commandCode]();
                else if (commandCode == -1)
                    break;
            }
            exit(0);
        }
        close(fds[0]);
        subs.push_back(subEp(id, fds[1]));
        deleteFd.push_back(fds[1]);
    }
}

void loadBlanceContrl(std::vector<subEp> &subs, std::vector<func_t> &funcmap, int count)
{
    int processnum = subs.size();
    int tasknum = funcmap.size();
    bool forever = (count == 0) ? true : false;

    while (true)
    {
        // 1. 选择一个子进程 --> std::vector<subEp> -> index - 随机数
        int subIdx = rand() % processnum;
        // 2. 选择一个任务 --> std::vector<func_t> -> index
        int taskIdx = rand() % tasknum;
        // 3. 任务发送给选择的进程
        sendTask(subs[subIdx], taskIdx);
        sleep(1);
        if (!forever && count > 0)
        {
            count--;
            if (count == 0)
                break;
        }
    }

    // write quit -> read 0
    for (int i = 0; i < processnum; i++)
        close(subs[i]._writeFd);
}

void waitProcess(std::vector<subEp> &subs)
{
    int processnum = subs.size();
    for (int i = 0; i < processnum; i++)
    {
        waitpid(subs[i]._subId, nullptr, 0);
        std::cout << "wait sub process success" << subs[i]._subId << std::endl;
    }
}

int main()
{
    MakeSeed();
    // 1. 建立子进程并建立和子进程通信的信道
    // 1.1 加载方法表
    std::vector<func_t> funcMap;
    loadTaskFunc(funcMap);
    // 1.2 创建子进程,并且维护好父子通信信道
    std::vector<subEp> subs;
    createSubProcess(subs, funcMap);

    // 2. 走到这里就是父进程, 控制子进程,负载均衡的向子进程发送命令码
    int taskCnt = 3; // 0: 永远进行
    loadBlanceContrl(subs, funcMap, taskCnt);

    // 3. 回收子进程信息
    waitProcess(subs);
    return 0;
}

注意事项:

我们这里在创建第二个及以后的子进程的之后,会继承父进程为第一个子进程打开的写端,所以我们后面的进程就会继承父进程的多个写端,所以我们在关闭文件描述符的时候,不能关闭一个就等待一个,因为这样会被阻塞,因为前面的子进程的文件描述符被后面的继承了,你只关闭了前面的子进程的文件描述符,此时后面子进程继承的该文件描述符没有关闭,所以最终该文件描述符没有关闭,所以前面的子进程会 不会退出,所以就会阻塞在waitpid这里。

我们这里有两种解决方案,一是先统一的将所有的子进程的文件描述符进行关闭,之后再统一waitpid。

二是在创建每一个子进程之后,父进程将写文件描述符放入一个数组中,子进程执行时就关闭前面子进程继承的父进程打开的写文件描述符,这样做可以的原因是,子进程关闭写文件描述符,会发生写时拷贝,所以只会关闭前面的子进程所进程的父进程的写文件描述符

3.命名管道

匿名管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。

如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。命名管道是一种特殊类型的文件

我们可以让不同的进程打开指定名称(路径+文件名)的同一个文件,这样就可以让不同的进程看到同一份资源。而路径+文件名保证了文件的唯一性

3.1创建一个命名管道

命名管道可以从命令行上创建,命令行方法是使用下面这个命令:

mkfifo filename

在这里插入图片描述

我们循环的写入"hello world"到管道中

在这里插入图片描述

最终发现,name_pipe的大小没有改变,因为管道文件中的数据并没有写到磁盘上

在这里插入图片描述

当我们使用另外一个进程进行查看name_pipe中数据的时候,就可以看到写入的数据,也就实现了两个不同的进程之间的通信

命名管道也可以从程序里创建,相关函数有:

int mkfifo(const char *filename,mode_t mode);
函数功能:创建命名管道文件
头文件:
#include <sys/types.h>
#include <sys/stat.h>
参数:
filename:文件名
mode:文件的权限
返回值:成功返回0,失败返回-1,错误码被设置

int unlink(const char *path);
函数功能:移除管道文件
头文件:
#include <unistd.h>
参数:文件的路径
返回值:成功返回0,失败返回-1,错误码被设置,返回-1时,命名管道不会被改变

3.2用命名管道实现server&client通信

client端向server端发送信息,server收到信息之后将信息打印出来

3.2.1server.cc
#include "comm.hpp"

int main()
{
    int r = ctreateFifo(NAME_PIPE);
    assert(r);
    (void)r;

    std::cout << "server begin" << std::endl;
    int rfd = open(NAME_PIPE, O_RDONLY);
    std::cout << "server end" << std::endl;
    assert(rfd != -1);

    char buffer[1024];
    while (true)
    {
        ssize_t s = read(rfd, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            buffer[s] = 0;
            std::cout << "client->server# " << buffer << std::endl;
        }
        else if (s == 0)
        {
            std::cout << "client quit,me too!" << std::endl;
            break;
        }
        else
        {
            std::cout << "err string" << strerror(errno) << std::endl;
            break;
        }
    }

    close(rfd);
    removeFifo(NAME_PIPE);
    return 0;
}
3.2.2client.cc
#include "comm.hpp"

int main()
{
    std::cout << "client begin" << std::endl;
    int wfd = open(NAME_PIPE, O_WRONLY);
    std::cout << "client end" << std::endl;
    assert(wfd != -1);
    (void)wfd;

    char buffer[1024];
    while (true)
    {
        std::cout << "Please say#";
        fgets(buffer, sizeof(buffer), stdin);
        if (strlen(buffer) > 0)
            buffer[strlen(buffer) - 1] = 0;
        ssize_t n = write(wfd, buffer, strlen(buffer));
        assert(n == strlen(buffer));
        (void)n;
    }

    close(wfd);
    return 0;
}
3.3.3comm.hpp
#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define NAME_PIPE "./named_pipe"

bool ctreateFifo(const std::string &path)
{
    umask(0);
    int n = mkfifo(path.c_str(), 0666);
    if (n == 0)
        return true;
    else
    {
        std::cout << "errno:" << errno << "err string:" << strerror(errno) << std::endl;
        return false;
    }
}

void removeFifo(const std::string &path)
{
    int n = unlink(path.c_str());
    assert(n == 0);
    (void)n;
}

3.3命名管道的打开规则

如果当前打开操作是为读而打开FIFO时

O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO

O_NONBLOCK enable:立刻返回成功

如果当前打开操作是为写而打开FIFO时

O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO

O_NONBLOCK enable:立刻返回失败,错误码为ENXIO

4.匿名管道与命名管道的区别

匿名管道由pipe函数创建并打开。

命名管道由mkfifo函数创建,打开用open

FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。

文章来源:https://blog.csdn.net/qq_67582098/article/details/134885009
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。