linux C++监听管道文件方式
2023-12-15 19:47:28
方式一(传统读取文件,一直监听循环读取文件)
非阻塞打开文件,用read循环定时读取,性能不好
代码如下:
#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
constexpr int kBufferSize = 256;
class FileDescriptor {
public:
explicit FileDescriptor(int fd) : fd_(fd) {}
~FileDescriptor() {
if (fd_ >= 0) {
::close(fd_);
}
}
int get() const {
return fd_;
}
private:
int fd_;
};
class PipeListener {
public:
using DataCallback = std::function<void(std::string&&)>;
PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {
if (access(pipePath.c_str(), F_OK) == -1) {
if (mkfifo(pipePath.c_str(), 0666) == -1) {
std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
}
}
}
~PipeListener() {
stopListening();
}
void setDataCallback(DataCallback callback) {
dataCallback_ = std::move(callback);
}
void startListening() {
stopListening_ = false;
listeningThread_ = std::thread(&PipeListener::listenThread, this);
std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
}
void stopListening() {
stopListening_ = true;
if (listeningThread_.joinable()) {
listeningThread_.join();
std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
}
}
private:
void listenThread() {
auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));
if (fd->get() < 0) {
std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
return;
}
char buffer[kBufferSize];
while (!stopListening_) {
ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));
if (bytesRead > 0) {
std::string data(buffer, bytesRead);
if (!data.empty() && dataCallback_) {
dataCallback_(std::move(data));
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
private:
std::string pipePath_;
DataCallback dataCallback_;
std::thread listeningThread_;
bool stopListening_;
};
int main() {
PipeListener pipeListener("/home/hello/tmp/test_pipe"); // 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发
pipeListener.setDataCallback([](std::string&& data) {
std::cout << "Received size: " << data.size() << std::endl;
std::cout << "Received data: " << data << std::endl;
std::cout << "Received data (hex): ";
for (char c : data) {
std::cout << std::hex << (int)(unsigned char)c << " ";
}
std::cout << std::dec << std::endl;
});
pipeListener.startListening();
std::cout << "main" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(100));
pipeListener.stopListening();
return 0;
}
方式二(用epoll监听管道文件内容)
但是要停止程序时会阻塞在epoll_wait,等待管道消息之后才能正常退出
代码如下:
#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <cstring>
constexpr int kBufferSize = 256;
class FileDescriptor {
public:
explicit FileDescriptor(int fd) : fd_(fd) {}
~FileDescriptor() {
if (fd_ >= 0) {
std::cout << "close fd_ " << std::endl;
::close(fd_);
}
}
int get() const {
return fd_;
}
private:
int fd_;
};
class PipeListener {
public:
using DataCallback = std::function<void(std::string&&)>;
PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {
if (access(pipePath.c_str(), F_OK) == -1) {
if (mkfifo(pipePath.c_str(), 0666) == -1) {
std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
}
}
}
~PipeListener() {
stopListening();
std::cout << "close epollfd_ " << std::endl;
close(epollfd_);
}
void setDataCallback(DataCallback callback) {
dataCallback_ = std::move(callback);
}
void startListening() {
stopListening_ = false;
listeningThread_ = std::thread(&PipeListener::listenThread, this);
std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
}
void stopListening() {
stopListening_ = true;
if (listeningThread_.joinable()) {
std::cout << " wait Stopped listening on pipe: " << pipePath_ << std::endl;
listeningThread_.join();
std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
}
}
private:
void listenThread() {
auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));
if (fd->get() < 0) {
std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
return;
}
int epollfd_ = epoll_create(1);
if (epollfd_ == -1) {
std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;
return;
}
struct epoll_event event;
event.events = EPOLLIN | EPOLLET; // Enable edge-triggered mode
event.data.fd = fd->get();
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd->get(), &event) == -1) {
std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;
return;
}
char buffer[kBufferSize];
while (!stopListening_) {
struct epoll_event events[1];
int nfds = epoll_wait(epollfd_, events, 1, -1);
if (nfds == -1) {
std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;
break;
}
for (int i = 0; i < nfds; ++i) {
if (events[i].events & EPOLLIN) {
ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));
if (bytesRead > 0) {
std::string data(buffer, bytesRead);
if (!data.empty() && dataCallback_) {
dataCallback_(std::move(data));
}
}
}
}
}
}
private:
std::string pipePath_;
DataCallback dataCallback_;
std::thread listeningThread_;
bool stopListening_;
int epollfd_; // 新增的成员变量用于保存 epollfd_
};
int main() {
PipeListener pipeListener("/home/hello/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发
pipeListener.setDataCallback([](std::string&& data) {
std::cout << "Received size: " << data.size() << std::endl;
std::cout << "Received data: " << data << std::endl;
std::cout << "Received data (hex): ";
for (char c : data) {
std::cout << std::hex << (int)(unsigned char)c << " ";
}
std::cout << std::dec << std::endl;
});
pipeListener.startListening();
std::cout << "main" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
// pipeListener.stopListening();
std::cout << "main return" << std::endl;
return 0;
}
方式三(用epoll监听管道文件内容)
用epoll监听管道文件内容,对象析构或者退出时会通过epoll唤醒epoll_wait,释放资源正常退出
代码如下:
#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <csignal>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <condition_variable>
#include <sys/eventfd.h>
#include <cstring>
constexpr int kBufferSize = 256;
class PipeListener {
public:
using DataCallback = std::function<void(std::string&&)>;
PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(true), epollfd_(0) {
if (access(pipePath.c_str(), F_OK) == -1) {
if (mkfifo(pipePath.c_str(), 0666) == -1) {
std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
}
}
}
~PipeListener() {
stopListening();
}
void setDataCallback(DataCallback callback) {
dataCallback_ = std::move(callback);
}
void startListening() {
if (stopListening_ == false) {
std::cout << "already start Listening " << std::endl;
return;
}
stopListening_ = false;
pipe_fd_ = open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK);
std::cout << "startListening pipe_fd_ ["<< pipe_fd_ <<"] " << std::endl;
if (!pipe_fd_) {
std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
return;
}
epollfd_ = epoll_create(1);
std::cout << "startListening epollfd_ ["<< epollfd_ <<"] " << std::endl;
if (epollfd_ == -1) {
std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;
return;
}
struct epoll_event event;
event.events = EPOLLIN | EPOLLET; // Enable edge-triggered mode
event.data.fd = pipe_fd_;
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, pipe_fd_, &event) == -1) {
std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;
return;
}
// 创建用于通知的 eventfd,监听要求停止监听管道文件事件,方便安全释放资源退出程序
eventfd_ = eventfd(0, EFD_NONBLOCK);
std::cout << "startListening eventfd_ ["<< eventfd_ <<"] " << std::endl;
if (eventfd_ == -1) {
std::cerr << "Failed to create eventfd. Error: " << strerror(errno) << std::endl;
return;
}
event.events = EPOLLIN | EPOLLET;
event.data.fd = eventfd_;
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, eventfd_, &event) == -1) {
std::cerr << "Failed to add eventfd to epoll. Error: " << strerror(errno) << std::endl;
return;
}
listeningThread_ = std::thread(&PipeListener::listenThread, this);
std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
}
void stopListening() {
if (stopListening_) {
std::cout << "already stop Listening " << std::endl;
return;
}
stopListening_ = true;
// 写入一个字节到 eventfd 唤醒 epoll_wait,等待安全退出
uint64_t value = 1;
write(eventfd_, &value, sizeof(value));
if (listeningThread_.joinable()) {
listeningThread_.join();
std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
}
// 从 epoll 实例中删除文件描述符
epoll_event event;
event.data.fd = pipe_fd_;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);
event.data.fd = eventfd_;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);
close(pipe_fd_);
close(eventfd_);
close(epollfd_);
std::cout << "pipe_fd_ ["<< pipe_fd_ <<"] close " << std::endl;
std::cout << "epollfd_ ["<< epollfd_ <<"] close " << std::endl;
std::cout << "eventfd_ ["<< eventfd_ <<"] close " << std::endl;
}
private:
void listenThread() {
std::cout << "listenThread start " << pipePath_ << std::endl;
char buffer[kBufferSize];
while (true) {
struct epoll_event events[2];
int nfds = epoll_wait(epollfd_, events, 2, -1);
if (stopListening_) {
break;
}
if (nfds == -1) {
std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;
}
for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == pipe_fd_ && (events[i].events & EPOLLIN)) {
ssize_t bytesRead = ::read(pipe_fd_, buffer, sizeof(buffer));
if (bytesRead > 0) {
std::string data(buffer, bytesRead);
if (!data.empty() && dataCallback_) {
dataCallback_(std::move(data));
}
}
} else if (events[i].data.fd == eventfd_ && (events[i].events & EPOLLIN)) {
// 读取 eventfd,清空它
uint64_t value;
read(eventfd_, &value, sizeof(value));
}
}
}
std::cout << "listenThread exit " << pipePath_ << std::endl;
}
private:
std::string pipePath_;
DataCallback dataCallback_;
std::thread listeningThread_;
bool stopListening_ = true; // 默认状态停止的
int epollfd_;
int eventfd_;
int pipe_fd_;
};
int main() {
PipeListener pipeListener("/home/woan/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/woan/tmp/test_pipe 命令就能触发
pipeListener.setDataCallback([](std::string&& data) {
std::cout << "Received size: " << data.size() << std::endl;
std::cout << "Received data: " << data << std::endl;
std::cout << "Received data (hex): ";
for (char c : data) {
std::cout << std::hex << (int)(unsigned char)c << " ";
}
std::cout << std::dec << std::endl;
});
pipeListener.startListening();
std::cout << "main" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(10));
pipeListener.stopListening();
std::this_thread::sleep_for(std::chrono::seconds(2));
pipeListener.startListening();
std::this_thread::sleep_for(std::chrono::seconds(10));
pipeListener.stopListening();
return 0;
}
文章来源:https://blog.csdn.net/qq_42145185/article/details/134875506
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!