linux C++监听管道文件方式

2023-12-15 19:47:28

方式一(传统读取文件,一直监听循环读取文件)

非阻塞打开文件,用read循环定时读取,性能不好

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>

constexpr int kBufferSize = 256;

class FileDescriptor {
public:
    explicit FileDescriptor(int fd) : fd_(fd) {}
    ~FileDescriptor() {
        if (fd_ >= 0) {
            ::close(fd_);
        }
    }

    int get() const {
        return fd_;
    }

private:
    int fd_;
};

class PipeListener {
public:
    using DataCallback = std::function<void(std::string&&)>;

    PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {
        if (access(pipePath.c_str(), F_OK) == -1) {
            if (mkfifo(pipePath.c_str(), 0666) == -1) {
                std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            }
        }
    }

    ~PipeListener() {
        stopListening();
    }

    void setDataCallback(DataCallback callback) {
        dataCallback_ = std::move(callback);
    }

    void startListening() {
        stopListening_ = false;
        listeningThread_ = std::thread(&PipeListener::listenThread, this);
        std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
    }

    void stopListening() {
        stopListening_ = true;
        if (listeningThread_.joinable()) {
            listeningThread_.join();
            std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
        }
    }

private:
    void listenThread() {
        auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));

        if (fd->get() < 0) {
            std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            return;
        }

        char buffer[kBufferSize];
        while (!stopListening_) {
            ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));

            if (bytesRead > 0) {
                std::string data(buffer, bytesRead);

                if (!data.empty() && dataCallback_) {
                    dataCallback_(std::move(data));
                }
            }

            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }

private:
    std::string pipePath_;
    DataCallback dataCallback_;
    std::thread listeningThread_;
    bool stopListening_;
};

int main() {
    PipeListener pipeListener("/home/hello/tmp/test_pipe"); // 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发

    pipeListener.setDataCallback([](std::string&& data) {
        std::cout << "Received size: " << data.size() << std::endl;
        std::cout << "Received data: " << data << std::endl;
        std::cout << "Received data (hex): ";
        for (char c : data) {
            std::cout << std::hex << (int)(unsigned char)c << " ";
        }
        std::cout << std::dec << std::endl;
    });

    pipeListener.startListening();

    std::cout << "main" << std::endl;

    std::this_thread::sleep_for(std::chrono::seconds(100));
    pipeListener.stopListening();

    return 0;
}

方式二(用epoll监听管道文件内容)

但是要停止程序时会阻塞在epoll_wait,等待管道消息之后才能正常退出

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <cstring> 

constexpr int kBufferSize = 256;

class FileDescriptor {
public:
    explicit FileDescriptor(int fd) : fd_(fd) {}
    ~FileDescriptor() {
        if (fd_ >= 0) {
            std::cout << "close fd_ " << std::endl;
            ::close(fd_);
        }
    }

    int get() const {
        return fd_;
    }

private:
    int fd_;
};

class PipeListener {
public:
    using DataCallback = std::function<void(std::string&&)>;

    PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {
        if (access(pipePath.c_str(), F_OK) == -1) {
            if (mkfifo(pipePath.c_str(), 0666) == -1) {
                std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            }
        }
    }

    ~PipeListener() {
        stopListening();
        std::cout << "close epollfd_ " << std::endl;
        close(epollfd_);
    }

    void setDataCallback(DataCallback callback) {
        dataCallback_ = std::move(callback);
    }

    void startListening() {
        stopListening_ = false;
        listeningThread_ = std::thread(&PipeListener::listenThread, this);
        std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
    }

    void stopListening() {
        stopListening_ = true;
        if (listeningThread_.joinable()) {
            std::cout << " wait Stopped listening on pipe: " << pipePath_ << std::endl;
            listeningThread_.join();
            std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
        }
    }

private:
    void listenThread() {
        auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));

        if (fd->get() < 0) {
            std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            return;
        }

        int epollfd_ = epoll_create(1);
        if (epollfd_ == -1) {
            std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;
            return;
        }

        struct epoll_event event;
        event.events = EPOLLIN | EPOLLET;  // Enable edge-triggered mode
        event.data.fd = fd->get();

        if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd->get(), &event) == -1) {
            std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;
            return;
        }

        char buffer[kBufferSize];
        while (!stopListening_) {
            struct epoll_event events[1];
            int nfds = epoll_wait(epollfd_, events, 1, -1);

            if (nfds == -1) {
                std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;
                break;
            }

            for (int i = 0; i < nfds; ++i) {
                if (events[i].events & EPOLLIN) {
                    ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));

                    if (bytesRead > 0) {
                        std::string data(buffer, bytesRead);

                        if (!data.empty() && dataCallback_) {
                            dataCallback_(std::move(data));
                        }
                    }
                }
            }
        }

    }

private:
    std::string pipePath_;
    DataCallback dataCallback_;
    std::thread listeningThread_;
    bool stopListening_;
    int epollfd_;  // 新增的成员变量用于保存 epollfd_
};

int main() {
    PipeListener pipeListener("/home/hello/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发

    pipeListener.setDataCallback([](std::string&& data) {
        std::cout << "Received size: " << data.size() << std::endl;
        std::cout << "Received data: " << data << std::endl;
        std::cout << "Received data (hex): ";
        for (char c : data) {
            std::cout << std::hex << (int)(unsigned char)c << " ";
        }
        std::cout << std::dec << std::endl;
    });

    pipeListener.startListening();

    std::cout << "main" << std::endl;

    std::this_thread::sleep_for(std::chrono::seconds(1));
    // pipeListener.stopListening();
    std::cout << "main return" << std::endl;

    return 0;
}

方式三(用epoll监听管道文件内容)

用epoll监听管道文件内容,对象析构或者退出时会通过epoll唤醒epoll_wait,释放资源正常退出

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <csignal>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <condition_variable>
#include <sys/eventfd.h>
#include <cstring>  

constexpr int kBufferSize = 256;

class PipeListener {
public:
    using DataCallback = std::function<void(std::string&&)>;

    PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(true), epollfd_(0) {
        if (access(pipePath.c_str(), F_OK) == -1) {
            if (mkfifo(pipePath.c_str(), 0666) == -1) {
                std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            }
        }
    }

    ~PipeListener() {
        stopListening();
    }

    void setDataCallback(DataCallback callback) {
        dataCallback_ = std::move(callback);
    }

    void startListening() {
        if (stopListening_ == false) {
            std::cout << "already start Listening " << std::endl;
            return;
        }
        stopListening_ = false;
        pipe_fd_ = open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK);
        std::cout << "startListening pipe_fd_ ["<< pipe_fd_ <<"] " << std::endl;
        if (!pipe_fd_) {
            std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;
            return;
        }

        epollfd_ = epoll_create(1);
        std::cout << "startListening epollfd_ ["<< epollfd_ <<"] " << std::endl;
        if (epollfd_ == -1) {
            std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;
            return;
        }

        struct epoll_event event;
        event.events = EPOLLIN | EPOLLET;  // Enable edge-triggered mode
        event.data.fd = pipe_fd_;

        if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, pipe_fd_, &event) == -1) {
            std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;
            return;
        }

        // 创建用于通知的 eventfd,监听要求停止监听管道文件事件,方便安全释放资源退出程序
        eventfd_ = eventfd(0, EFD_NONBLOCK);
        std::cout << "startListening eventfd_ ["<< eventfd_ <<"] " << std::endl;
        if (eventfd_ == -1) {
            std::cerr << "Failed to create eventfd. Error: " << strerror(errno) << std::endl;
            return;
        }

        event.events = EPOLLIN | EPOLLET;
        event.data.fd = eventfd_;

        if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, eventfd_, &event) == -1) {
            std::cerr << "Failed to add eventfd to epoll. Error: " << strerror(errno) << std::endl;
            return;
        }

        listeningThread_ = std::thread(&PipeListener::listenThread, this);
        std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;
    }

    void stopListening() {
        if (stopListening_) {
            std::cout << "already stop Listening " << std::endl;
            return;
        }
        stopListening_ = true;

        // 写入一个字节到 eventfd 唤醒 epoll_wait,等待安全退出
        uint64_t value = 1;
        write(eventfd_, &value, sizeof(value));

        if (listeningThread_.joinable()) {
            listeningThread_.join();
            std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;
        }

        // 从 epoll 实例中删除文件描述符
        epoll_event event;
        event.data.fd = pipe_fd_;
        event.events = EPOLLIN | EPOLLET;
        epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);

        event.data.fd = eventfd_;
        event.events = EPOLLIN | EPOLLET;
        epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);

        close(pipe_fd_);
        close(eventfd_);
        close(epollfd_);
        std::cout << "pipe_fd_ ["<< pipe_fd_ <<"] close " << std::endl;
        std::cout << "epollfd_ ["<< epollfd_ <<"] close " << std::endl;
        std::cout << "eventfd_ ["<< eventfd_ <<"] close " << std::endl;
    }

private:
    void listenThread() {
        std::cout << "listenThread start " << pipePath_ << std::endl;
        char buffer[kBufferSize];
        while (true) {
            struct epoll_event events[2];
            int nfds = epoll_wait(epollfd_, events, 2, -1);

            if (stopListening_) {
                break;
            }

            if (nfds == -1) {
                std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;
            }

            for (int i = 0; i < nfds; ++i) {
                if (events[i].data.fd == pipe_fd_ && (events[i].events & EPOLLIN)) {
                    ssize_t bytesRead = ::read(pipe_fd_, buffer, sizeof(buffer));

                    if (bytesRead > 0) {
                        std::string data(buffer, bytesRead);

                        if (!data.empty() && dataCallback_) {
                            dataCallback_(std::move(data));
                        }
                    }
                } else if (events[i].data.fd == eventfd_ && (events[i].events & EPOLLIN)) {
                    // 读取 eventfd,清空它
                    uint64_t value;
                    read(eventfd_, &value, sizeof(value));
                }
            }
        }
        std::cout << "listenThread exit " << pipePath_ << std::endl;
    }

private:
    std::string pipePath_;
    DataCallback dataCallback_;
    std::thread listeningThread_;
    bool stopListening_ = true; // 默认状态停止的
    int epollfd_;
    int eventfd_;
    int pipe_fd_;
};

int main() {
    PipeListener pipeListener("/home/woan/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/woan/tmp/test_pipe 命令就能触发

    pipeListener.setDataCallback([](std::string&& data) {
        std::cout << "Received size: " << data.size() << std::endl;
        std::cout << "Received data: " << data << std::endl;
        std::cout << "Received data (hex): ";
        for (char c : data) {
            std::cout << std::hex << (int)(unsigned char)c << " ";
        }
        std::cout << std::dec << std::endl;
    });

    pipeListener.startListening();

    std::cout << "main" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(10));

    pipeListener.stopListening();

    std::this_thread::sleep_for(std::chrono::seconds(2));
    pipeListener.startListening();
    std::this_thread::sleep_for(std::chrono::seconds(10));
    pipeListener.stopListening();

    return 0;
}

文章来源:https://blog.csdn.net/qq_42145185/article/details/134875506
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。