【Linux】进程间通信之管道--命名管道&匿名管道通信&进程池设计
文章目录
一、进程间通信介绍
1.什么是通信
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变
2.为什么要有通信以及如何进行通信
我们知道,有时候我们是需要多进程协同的完成某些业务,比如cat file | grep “hello”,此时不同的进程之间就需要进行通信。但是进程具有独立性,今天我们需要通信,那么通信的成本一定不低。首先我们需要让不同的进程看到同一份资源,然后才能进行通信。
我们该如何理解通信的本质问题呢:OS需要直接或间接的给通信双方的进程提供"内存空间",要进行通信的进程,必须看到一份公共的资源,对于不同的通信种类,本质就是前面所说的资源,是OS中哪一个模块提供的
进程间通信的方式主要有一下三类:
1.管道
匿名管道pipe
命名管道
2.System V IPC
System V 消息队列
System V 共享内存
System V 信号量
3.POSIX IPC
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
二、管道
1.什么是管道
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道"
2.匿名管道
2.1什么是匿名管道
我们知道,父进程打开的文件描述符表会拷贝给子进程一份,文件描述符表中的地址指向相同的struct_file。此时父进程和子进程就看到了同一个文件,一个struct_file中有file的操作方法,有属于自己的内核缓冲区-struct_page{}。这样通过父进程fork创建子进程的方式,让两个进程看到同一份内存资源,这份资源就称为匿名管道。
父进程分别以读和写的方式打开同一个文件,然后父子进程分别关闭读端和写段,这样就可以实现父进程向管道中写入数据,子进程进行读取,反之子进程向管道中写入数据,父进程中管道中进行读取。一般而言,管道只能用来单向数据通信。而匿名管道目前能用来父子进程之间进行进程间通信。
2.2接口认识–pipe
int pipe(int pipefd[2]);
函数功能:创建一个管道
头文件:#include <unistd.h>
参数:pipefd[2],输出型参数,用于返回两个文件描述符,pipefd[0]为读端,pipefd[1]为写端
返回值:成功返回0,失败返回-1,错误码被设置
2.3进程间通信之匿名管道实现
#include <iostream>
#include <cassert>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
// 父进程进行读取,子进程进行写入
int main()
{
// 第一步:创建管道文件,打开读写端
int fds[2];
int n = pipe(fds);
assert(n == 0);
// 第二步: fork
pid_t id = fork();
assert(id >= 0);
if (id == 0)
{
// 子进程进行写入
close(fds[0]);
// 子进程的通信代码
const char *message = "我是子进程,我正在给你发消息";
int cnt = 0;
while (true)
{
cnt++;
char buffer[1024];
snprintf(buffer, sizeof(buffer), "chile->parent say:%s[%d][%d]", message, cnt, getpid());
// 写端写满的时候,在写会阻塞,等对方进行读取!
write(fds[1], buffer, strlen(buffer));
sleep(1);
}
close(fds[1]);
cout << "子进程关闭自己的写端" << endl;
exit(0);
}
// 父进程进行读取
close(fds[1]);
// 父进程的通信代码
while (true)
{
char buffer[1024];
// cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl;
// 如果管道中没有了数据,读端在读,默认会直接阻塞当前正在读取的进程!
ssize_t s = read(fds[0], buffer, sizeof(buffer) - 1);
// cout << "BBBBBBBBBBBBBBBBBBBBBB" << endl;
if (s > 0)
{
buffer[s] = 0;
cout << "Get Message# " << buffer << " | my pid: " << getpid() << endl;
}
else if (s == 0)
{
cout << "read #" << s << endl;
break;
}
}
close(fds[0]);
int status = 0;
n = waitpid(id, &status, 0);
assert(n == id);
cout << "pid->" << n << " : " << (status & 0x7F) << endl;
return 0;
}
2.4读写特征
1.读慢,写快
我们将父进程每读一次,sleep 1秒
每次读到多次写入的数据
2.读快,写慢
我们让子进程每写一次,就sleep 1秒
由于每隔一秒才进行写入一次,那么父进程读取一次之后,等子进程下一次写入之后在 再进行读取,期间会阻塞在read处
3.写关闭,读到0
我们让子进程写入一次之后,睡眠5后,关闭写端
5秒之后,写端关闭,读到0-文件结尾
4.读关闭,OS会给进程发生信号终止写端
我们让父进程读取一次之后,关闭读端
此时,子进程接收到了13号信号
总结:
当没有数据可读时
O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
当管道满的时候
O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
如果所有管道写端对应的文件描述符被关闭,则read返回0
如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出
当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
2.5管道的特征
只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。管道提供流式服务
一般而言,进程退出,管道释放,所以管道的生命周期随进程
一般而言,内核会对管道操作进行同步与互斥
管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
2.6 进程池设计
我们这里实现一个进行创建出多个子进程,然后通过父进程向子进程发送任务,让子进程去执行对应的任务。
代码实现:
#include <iostream>
#include <vector>
#include <functional>
#include <cassert>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#define MakeSeed() srand((unsigned long)time(nullptr) ^ getpid() ^ 0x171237 ^ rand() % 1234)
const int processnum = 5;
typedef std::function<void()> func_t;
void downloadTask()
{
std::cout << getpid() << " 正在执行下载任务" << std::endl;
sleep(1);
}
void IOTask()
{
std::cout << getpid() << " 正在执行IO任务" << std::endl;
sleep(1);
}
void fflushTask()
{
std::cout << getpid() << " 正在执行刷新任务" << std::endl;
sleep(1);
}
void loadTaskFunc(std::vector<func_t> &funcmap)
{
funcmap.push_back(downloadTask);
funcmap.push_back(IOTask);
funcmap.push_back(fflushTask);
}
class subEp
{
public:
subEp(const pid_t &subId, const int writeFd)
: _subId(subId), _writeFd(writeFd)
{
char namebuffer[1024];
snprintf(namebuffer, sizeof namebuffer, "process-%d[pid(%d) - fd(%d)]", _num++, _subId, _writeFd);
_name = namebuffer;
}
public:
static int _num;
std::string _name;
pid_t _subId;
int _writeFd;
};
int subEp::_num = 0;
int recvTask(int fd)
{
int code = 0;
ssize_t s = read(fd, &code, sizeof(code));
if (s == 4)
return code;
else if (s <= 0)
return -1;
else
return 0;
}
void sendTask(const subEp &process, int taskNum)
{
std::cout << "send task num:" << taskNum << "send to ->" << process._name << std::endl;
int n = write(process._writeFd, &taskNum, sizeof(taskNum));
assert(n == 4);
(void)n;
}
void createSubProcess(std::vector<subEp> &subs, std::vector<func_t> &funcmap)
{
std::vector<int> deleteFd;
for (int i = 0; i < processnum; i++)
{
int fds[2];
int n = pipe(fds);
assert(n == 0);
(void)n;
// 父进程打开的文件,是会被子进程共享的
pid_t id = fork();
if (id == 0)
{
close(fds[1]);
while (true)
{
for (int i = 0; i < deleteFd.size(); i++)
close(deleteFd[i]);
// 1. 获取命令码,如果没有发送,我们子进程应该阻塞
int commandCode = recvTask(fds[0]);
// 2. 完成任务
if (commandCode >= 0 && commandCode < funcmap.size())
funcmap[commandCode]();
else if (commandCode == -1)
break;
}
exit(0);
}
close(fds[0]);
subs.push_back(subEp(id, fds[1]));
deleteFd.push_back(fds[1]);
}
}
void loadBlanceContrl(std::vector<subEp> &subs, std::vector<func_t> &funcmap, int count)
{
int processnum = subs.size();
int tasknum = funcmap.size();
bool forever = (count == 0) ? true : false;
while (true)
{
// 1. 选择一个子进程 --> std::vector<subEp> -> index - 随机数
int subIdx = rand() % processnum;
// 2. 选择一个任务 --> std::vector<func_t> -> index
int taskIdx = rand() % tasknum;
// 3. 任务发送给选择的进程
sendTask(subs[subIdx], taskIdx);
sleep(1);
if (!forever && count > 0)
{
count--;
if (count == 0)
break;
}
}
// write quit -> read 0
for (int i = 0; i < processnum; i++)
close(subs[i]._writeFd);
}
void waitProcess(std::vector<subEp> &subs)
{
int processnum = subs.size();
for (int i = 0; i < processnum; i++)
{
waitpid(subs[i]._subId, nullptr, 0);
std::cout << "wait sub process success" << subs[i]._subId << std::endl;
}
}
int main()
{
MakeSeed();
// 1. 建立子进程并建立和子进程通信的信道
// 1.1 加载方法表
std::vector<func_t> funcMap;
loadTaskFunc(funcMap);
// 1.2 创建子进程,并且维护好父子通信信道
std::vector<subEp> subs;
createSubProcess(subs, funcMap);
// 2. 走到这里就是父进程, 控制子进程,负载均衡的向子进程发送命令码
int taskCnt = 3; // 0: 永远进行
loadBlanceContrl(subs, funcMap, taskCnt);
// 3. 回收子进程信息
waitProcess(subs);
return 0;
}
注意事项:
我们这里在创建第二个及以后的子进程的之后,会继承父进程为第一个子进程打开的写端,所以我们后面的进程就会继承父进程的多个写端,所以我们在关闭文件描述符的时候,不能关闭一个就等待一个,因为这样会被阻塞,因为前面的子进程的文件描述符被后面的继承了,你只关闭了前面的子进程的文件描述符,此时后面子进程继承的该文件描述符没有关闭,所以最终该文件描述符没有关闭,所以前面的子进程会 不会退出,所以就会阻塞在waitpid这里。
我们这里有两种解决方案,一是先统一的将所有的子进程的文件描述符进行关闭,之后再统一waitpid。
二是在创建每一个子进程之后,父进程将写文件描述符放入一个数组中,子进程执行时就关闭前面子进程继承的父进程打开的写文件描述符,这样做可以的原因是,子进程关闭写文件描述符,会发生写时拷贝,所以只会关闭前面的子进程所进程的父进程的写文件描述符
3.命名管道
匿名管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。命名管道是一种特殊类型的文件
我们可以让不同的进程打开指定名称(路径+文件名)的同一个文件,这样就可以让不同的进程看到同一份资源。而路径+文件名保证了文件的唯一性
3.1创建一个命名管道
命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
mkfifo filename
我们循环的写入"hello world"到管道中
最终发现,name_pipe的大小没有改变,因为管道文件中的数据并没有写到磁盘上
当我们使用另外一个进程进行查看name_pipe中数据的时候,就可以看到写入的数据,也就实现了两个不同的进程之间的通信
命名管道也可以从程序里创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
函数功能:创建命名管道文件
头文件:
#include <sys/types.h>
#include <sys/stat.h>
参数:
filename:文件名
mode:文件的权限
返回值:成功返回0,失败返回-1,错误码被设置
int unlink(const char *path);
函数功能:移除管道文件
头文件:
#include <unistd.h>
参数:文件的路径
返回值:成功返回0,失败返回-1,错误码被设置,返回-1时,命名管道不会被改变
3.2用命名管道实现server&client通信
client端向server端发送信息,server收到信息之后将信息打印出来
3.2.1server.cc
#include "comm.hpp"
int main()
{
int r = ctreateFifo(NAME_PIPE);
assert(r);
(void)r;
std::cout << "server begin" << std::endl;
int rfd = open(NAME_PIPE, O_RDONLY);
std::cout << "server end" << std::endl;
assert(rfd != -1);
char buffer[1024];
while (true)
{
ssize_t s = read(rfd, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
std::cout << "client->server# " << buffer << std::endl;
}
else if (s == 0)
{
std::cout << "client quit,me too!" << std::endl;
break;
}
else
{
std::cout << "err string" << strerror(errno) << std::endl;
break;
}
}
close(rfd);
removeFifo(NAME_PIPE);
return 0;
}
3.2.2client.cc
#include "comm.hpp"
int main()
{
std::cout << "client begin" << std::endl;
int wfd = open(NAME_PIPE, O_WRONLY);
std::cout << "client end" << std::endl;
assert(wfd != -1);
(void)wfd;
char buffer[1024];
while (true)
{
std::cout << "Please say#";
fgets(buffer, sizeof(buffer), stdin);
if (strlen(buffer) > 0)
buffer[strlen(buffer) - 1] = 0;
ssize_t n = write(wfd, buffer, strlen(buffer));
assert(n == strlen(buffer));
(void)n;
}
close(wfd);
return 0;
}
3.3.3comm.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define NAME_PIPE "./named_pipe"
bool ctreateFifo(const std::string &path)
{
umask(0);
int n = mkfifo(path.c_str(), 0666);
if (n == 0)
return true;
else
{
std::cout << "errno:" << errno << "err string:" << strerror(errno) << std::endl;
return false;
}
}
void removeFifo(const std::string &path)
{
int n = unlink(path.c_str());
assert(n == 0);
(void)n;
}
3.3命名管道的打开规则
如果当前打开操作是为读而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
4.匿名管道与命名管道的区别
匿名管道由pipe函数创建并打开。
命名管道由mkfifo函数创建,打开用open
FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!