Python - 进程操作共享内存【SharedMemory】

2023-12-14 21:11:28

一. 前言

Python 3.8引入了multiprocessing模块中的共享内存类SharedMemory。使用SharedMemory可以更方便地操作共享内存.

二. 示例代码

# -*- coding: utf-8 -*-
import csv
import inspect
import json
import os
import traceback
from multiprocessing import shared_memory


class BytesJSONEncoder(json.JSONEncoder):
    """用于解决json.dumps函数发现字典里面有bytes类型的数据无法编码问题"""

    def default(self, obj):
        if not isinstance(obj, bytes):
            return json.JSONEncoder.default(self, obj)
        return str(obj, encoding='utf-8')


class SharedMemoryObj:
    def __init__(self, name='shm_0', size=1024):
        try:
            self.shm = shared_memory.SharedMemory(name=name, create=True, size=size)  # 尝试创建共享内存,若失败则映射同名内存空间
        except:
            self.shm = shared_memory.SharedMemory(name=name, create=False)

        self.shm_name = self.shm.name
        self.length = 0
        self.contentbegin = 0

    def write(self, obj: dict) -> bool:
        """
        将待写内容通过Json序列化,写入共享内存。
        写入格式:内容长度 + ~ + 内容
        Args:
            obj: 待写内容

        Returns:

        """
        try:
            if obj is not None and obj != {}:
                obj_str = json.dumps(obj, cls=BytesJSONEncoder)
                obj_len = len(obj_str)
                content = str(obj_len) + "~" + obj_str
                b = content.encode()
                self.shm.buf[0:len(b)] = b
                return True
            else:
                return False
        except Exception as e:
            traceback.print_exc()
            return False

    def read(self) -> dict:
        """
        读取共享内存,返回字典格式内容
        Returns:dict

        """
        try:
            s = bytes(self.shm.buf[:20])
            index = str(s, 'utf-8').find("~")
            if index != -1:
                head = s[0:index]
                contentlength = int(head)
                content = bytes(self.shm.buf[index + 1:index + 1 + contentlength]).decode('utf-8')
                obj = json.loads(content, object_hook=lambda d: {int(k) if k.lstrip('-').isdigit() else k: v for k, v in
                                                                 d.items()})
                # obj = json.loads(content)
                return obj
            else:
                return {}
        except Exception as e:
            traceback.print_exc()
            return {}

    def update(self, obj):
        org_obj = self.read()
        org_obj.update(obj)
        self.write(org_obj)


def get_protol_config(path):
    return FileOpn.read_json(path)


FolderPath = os.path.dirname(
    os.path.abspath(inspect.getfile(inspect.currentframe())))


class FileOpn(object):
    @staticmethod
    def read_csv(file_path, mark):
        with open(FolderPath + file_path, newline='', encoding='utf-8-sig') as f:
            f_csv = csv.DictReader(f)
            new_dict = {}
            for row in f_csv:
                for i in row.keys():
                    if i == 'storage_id':
                        storage_id = int(row[i])
                        new_dict[storage_id] = {}
                        new_dict[storage_id][i] = storage_id
                        new_dict[storage_id][mark] = {}
                        continue
                    new_dict[storage_id][mark][i] = int(row[i])
        return new_dict

    @staticmethod
    def read_json(file_path):
        try:
            with open(FolderPath + file_path, encoding='utf-8') as json_data:
                reader = json.load(json_data)
                locals().update(reader)
        except Exception as e:
            reader = None
        return reader


if __name__ == '__main__':
    machine_config_shared_memory = SharedMemoryObj('sha_machine_config', 8 * 1024 * 1024)
    config_36 = "/protol_config_36c.json"
    config_test = "/test.json"
    get_config_36 = get_protol_config(config_36)
    get_config_test = get_protol_config(config_test)
    machine_config_shared_memory.update(get_config_36)
    machine_config_shared_memory.update(get_config_test)
    # machine_config_shared_memory.write(get_config_36)
    # machine_config_shared_memory.write(get_config_test)
    res = machine_config_shared_memory.read()
    print(res)

定义了一个SharedMemoryObj类,用于创建共享内存,提供存储和读取数据的方法。

  • write()方法中,先将待写内容通过Json序列化为字符串,然后写入共享内存;
  • read()方法中,先从共享内存中读取字节数据,按照写入格式进行划分,获取内容长度和内容,并将内容反序列化为字典格式返回;
  • update()方法中,先从共享内存中读取原有字典数据,对其进行更新,然后重新写入共享内存。

以上就是关于Python - 进程操作共享内存【SharedMemory】的基本介绍,希望对你有所帮助!

文章来源:https://blog.csdn.net/qq_43030934/article/details/134925966
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。