linux C++监听管道文件方式
2023-12-15 19:47:28
    		方式一(传统读取文件,一直监听循环读取文件)
非阻塞打开文件,用read循环定时读取,性能不好
代码如下:
#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
constexpr int kBufferSize = 256;
class FileDescriptor {
public:
    explicit FileDescriptor(int fd) : fd_(fd) {}
    ~FileDescriptor() {
        if (fd_ >= 0) {
            ::close(fd_);
        }
    }
    int get() const {
        return fd_;
    }
private:
    int fd_;
};
class PipeListener {
public:
    using DataCallback = std::function<void(std::string&&)>;
    PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {
        if (access(pipePath.c_str(), F_OK) == -1) {
            if (mkfifo(pipePath.c_str(), 0666) == -1) {
                std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            }
        }
    }
    ~PipeListener() {
        stopListening();
    }
    void setDataCallback(DataCallback callback) {
        dataCallback_ = std::move(callback);
    }
    void startListening() {
        stopListening_ = false;
        listeningThread_ = std::thread(&PipeListener::listenThread, this);
        std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
    }
    void stopListening() {
        stopListening_ = true;
        if (listeningThread_.joinable()) {
            listeningThread_.join();
            std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
        }
    }
private:
    void listenThread() {
        auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));
        if (fd->get() < 0) {
            std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            return;
        }
        char buffer[kBufferSize];
        while (!stopListening_) {
            ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));
            if (bytesRead > 0) {
                std::string data(buffer, bytesRead);
                if (!data.empty() && dataCallback_) {
                    dataCallback_(std::move(data));
                }
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
private:
    std::string pipePath_;
    DataCallback dataCallback_;
    std::thread listeningThread_;
    bool stopListening_;
};
int main() {
    PipeListener pipeListener("/home/hello/tmp/test_pipe"); // 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发
    pipeListener.setDataCallback([](std::string&& data) {
        std::cout << "Received size: " << data.size() << std::endl;
        std::cout << "Received data: " << data << std::endl;
        std::cout << "Received data (hex): ";
        for (char c : data) {
            std::cout << std::hex << (int)(unsigned char)c << " ";
        }
        std::cout << std::dec << std::endl;
    });
    pipeListener.startListening();
    std::cout << "main" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(100));
    pipeListener.stopListening();
    return 0;
}
方式二(用epoll监听管道文件内容)
但是要停止程序时会阻塞在epoll_wait,等待管道消息之后才能正常退出
代码如下:
#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <cstring> 
constexpr int kBufferSize = 256;
class FileDescriptor {
public:
    explicit FileDescriptor(int fd) : fd_(fd) {}
    ~FileDescriptor() {
        if (fd_ >= 0) {
            std::cout << "close fd_ " << std::endl;
            ::close(fd_);
        }
    }
    int get() const {
        return fd_;
    }
private:
    int fd_;
};
class PipeListener {
public:
    using DataCallback = std::function<void(std::string&&)>;
    PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {
        if (access(pipePath.c_str(), F_OK) == -1) {
            if (mkfifo(pipePath.c_str(), 0666) == -1) {
                std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            }
        }
    }
    ~PipeListener() {
        stopListening();
        std::cout << "close epollfd_ " << std::endl;
        close(epollfd_);
    }
    void setDataCallback(DataCallback callback) {
        dataCallback_ = std::move(callback);
    }
    void startListening() {
        stopListening_ = false;
        listeningThread_ = std::thread(&PipeListener::listenThread, this);
        std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
    }
    void stopListening() {
        stopListening_ = true;
        if (listeningThread_.joinable()) {
            std::cout << " wait Stopped listening on pipe: " << pipePath_ << std::endl;
            listeningThread_.join();
            std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
        }
    }
private:
    void listenThread() {
        auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));
        if (fd->get() < 0) {
            std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            return;
        }
        int epollfd_ = epoll_create(1);
        if (epollfd_ == -1) {
            std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;
            return;
        }
        struct epoll_event event;
        event.events = EPOLLIN | EPOLLET;  // Enable edge-triggered mode
        event.data.fd = fd->get();
        if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd->get(), &event) == -1) {
            std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;
            return;
        }
        char buffer[kBufferSize];
        while (!stopListening_) {
            struct epoll_event events[1];
            int nfds = epoll_wait(epollfd_, events, 1, -1);
            if (nfds == -1) {
                std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;
                break;
            }
            for (int i = 0; i < nfds; ++i) {
                if (events[i].events & EPOLLIN) {
                    ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));
                    if (bytesRead > 0) {
                        std::string data(buffer, bytesRead);
                        if (!data.empty() && dataCallback_) {
                            dataCallback_(std::move(data));
                        }
                    }
                }
            }
        }
    }
private:
    std::string pipePath_;
    DataCallback dataCallback_;
    std::thread listeningThread_;
    bool stopListening_;
    int epollfd_;  // 新增的成员变量用于保存 epollfd_
};
int main() {
    PipeListener pipeListener("/home/hello/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发
    pipeListener.setDataCallback([](std::string&& data) {
        std::cout << "Received size: " << data.size() << std::endl;
        std::cout << "Received data: " << data << std::endl;
        std::cout << "Received data (hex): ";
        for (char c : data) {
            std::cout << std::hex << (int)(unsigned char)c << " ";
        }
        std::cout << std::dec << std::endl;
    });
    pipeListener.startListening();
    std::cout << "main" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    // pipeListener.stopListening();
    std::cout << "main return" << std::endl;
    return 0;
}
方式三(用epoll监听管道文件内容)
用epoll监听管道文件内容,对象析构或者退出时会通过epoll唤醒epoll_wait,释放资源正常退出
代码如下:
#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <csignal>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <condition_variable>
#include <sys/eventfd.h>
#include <cstring>  
constexpr int kBufferSize = 256;
class PipeListener {
public:
    using DataCallback = std::function<void(std::string&&)>;
    PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(true), epollfd_(0) {
        if (access(pipePath.c_str(), F_OK) == -1) {
            if (mkfifo(pipePath.c_str(), 0666) == -1) {
                std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            }
        }
    }
    ~PipeListener() {
        stopListening();
    }
    void setDataCallback(DataCallback callback) {
        dataCallback_ = std::move(callback);
    }
    void startListening() {
        if (stopListening_ == false) {
            std::cout << "already start Listening " << std::endl;
            return;
        }
        stopListening_ = false;
        pipe_fd_ = open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK);
        std::cout << "startListening pipe_fd_ ["<< pipe_fd_ <<"] " << std::endl;
        if (!pipe_fd_) {
            std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            return;
        }
        epollfd_ = epoll_create(1);
        std::cout << "startListening epollfd_ ["<< epollfd_ <<"] " << std::endl;
        if (epollfd_ == -1) {
            std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;
            return;
        }
        struct epoll_event event;
        event.events = EPOLLIN | EPOLLET;  // Enable edge-triggered mode
        event.data.fd = pipe_fd_;
        if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, pipe_fd_, &event) == -1) {
            std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;
            return;
        }
        // 创建用于通知的 eventfd,监听要求停止监听管道文件事件,方便安全释放资源退出程序
        eventfd_ = eventfd(0, EFD_NONBLOCK);
        std::cout << "startListening eventfd_ ["<< eventfd_ <<"] " << std::endl;
        if (eventfd_ == -1) {
            std::cerr << "Failed to create eventfd. Error: " << strerror(errno) << std::endl;
            return;
        }
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = eventfd_;
        if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, eventfd_, &event) == -1) {
            std::cerr << "Failed to add eventfd to epoll. Error: " << strerror(errno) << std::endl;
            return;
        }
        listeningThread_ = std::thread(&PipeListener::listenThread, this);
        std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
    }
    void stopListening() {
        if (stopListening_) {
            std::cout << "already stop Listening " << std::endl;
            return;
        }
        stopListening_ = true;
        // 写入一个字节到 eventfd 唤醒 epoll_wait,等待安全退出
        uint64_t value = 1;
        write(eventfd_, &value, sizeof(value));
        if (listeningThread_.joinable()) {
            listeningThread_.join();
            std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
        }
        // 从 epoll 实例中删除文件描述符
        epoll_event event;
        event.data.fd = pipe_fd_;
        event.events = EPOLLIN | EPOLLET;
        epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);
        event.data.fd = eventfd_;
        event.events = EPOLLIN | EPOLLET;
        epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);
        close(pipe_fd_);
        close(eventfd_);
        close(epollfd_);
        std::cout << "pipe_fd_ ["<< pipe_fd_ <<"] close " << std::endl;
        std::cout << "epollfd_ ["<< epollfd_ <<"] close " << std::endl;
        std::cout << "eventfd_ ["<< eventfd_ <<"] close " << std::endl;
    }
private:
    void listenThread() {
        std::cout << "listenThread start " << pipePath_ << std::endl;
        char buffer[kBufferSize];
        while (true) {
            struct epoll_event events[2];
            int nfds = epoll_wait(epollfd_, events, 2, -1);
            if (stopListening_) {
                break;
            }
            if (nfds == -1) {
                std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;
            }
            for (int i = 0; i < nfds; ++i) {
                if (events[i].data.fd == pipe_fd_ && (events[i].events & EPOLLIN)) {
                    ssize_t bytesRead = ::read(pipe_fd_, buffer, sizeof(buffer));
                    if (bytesRead > 0) {
                        std::string data(buffer, bytesRead);
                        if (!data.empty() && dataCallback_) {
                            dataCallback_(std::move(data));
                        }
                    }
                } else if (events[i].data.fd == eventfd_ && (events[i].events & EPOLLIN)) {
                    // 读取 eventfd,清空它
                    uint64_t value;
                    read(eventfd_, &value, sizeof(value));
                }
            }
        }
        std::cout << "listenThread exit " << pipePath_ << std::endl;
    }
private:
    std::string pipePath_;
    DataCallback dataCallback_;
    std::thread listeningThread_;
    bool stopListening_ = true; // 默认状态停止的
    int epollfd_;
    int eventfd_;
    int pipe_fd_;
};
int main() {
    PipeListener pipeListener("/home/woan/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/woan/tmp/test_pipe 命令就能触发
    pipeListener.setDataCallback([](std::string&& data) {
        std::cout << "Received size: " << data.size() << std::endl;
        std::cout << "Received data: " << data << std::endl;
        std::cout << "Received data (hex): ";
        for (char c : data) {
            std::cout << std::hex << (int)(unsigned char)c << " ";
        }
        std::cout << std::dec << std::endl;
    });
    pipeListener.startListening();
    std::cout << "main" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(10));
    pipeListener.stopListening();
    std::this_thread::sleep_for(std::chrono::seconds(2));
    pipeListener.startListening();
    std::this_thread::sleep_for(std::chrono::seconds(10));
    pipeListener.stopListening();
    return 0;
}
    			文章来源:https://blog.csdn.net/qq_42145185/article/details/134875506
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
    	本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!