读取colmap中database.db里的匹配结果,并且写入images.txt和point3D.txt

2023-12-26 23:58:28

读取colmap中database.db里的匹配结果,并且写入images.txt和point3D.txt

colmap导出的model里有images.txt和point3d.txt,但是最近在搞低重叠度的影像的空三,colmap不能完整恢复所有影像的外参,重建的稀疏点云也只有一部分,要做完整测区的空三,就不能用它导出的images.txt和point3d.txt来做全局的BA。但是我又想用我原来读images.txt和point3d.txt的代码来读匹配,继续后面的实验(懒是原罪
所以我从database.db里读原始的匹配,再写入到images.txt和point3d.txt。这样我原先的代码就可以不用改了
在colmap源码里提供的scripts\python\export_inlier_matches.py进行修改,修改后代码如下:

import os
import argparse
import sqlite3
import cv2
import numpy as np

class FeaturePoint:
    def __init__(self, x, y, point3D_id=-1):
        self.x = x
        self.y = y
        self.point3D_id = point3D_id

class Observation:
    def __init__(self, image_id, point2D_idx):
        self.image_id = image_id
        self.point2D_idx = point2D_idx

class Point3D:
    def __init__(self, x=0, y=0, z=0):
        self.x = x
        self.y = y
        self.z = z
        self.observations = []

    def add_observation(self, observation):
        self.observations.append(observation)


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--database_path", required=True)
    parser.add_argument("--output_path", required=True)
    parser.add_argument("--image_path")
    parser.add_argument("--min_num_matches", type=int, default=15)
    args = parser.parse_args()
    return args


def pair_id_to_image_ids(pair_id):
    image_id2 = pair_id % 2147483647
    image_id1 = (pair_id - image_id2) / 2147483647
    return image_id1, image_id2

def get_keypoints(cursor, image_id):
    cursor.execute("SELECT * FROM keypoints WHERE image_id = ?;", (image_id,))        
    _, n_rows, n_columns, raw_data = cursor.fetchone()
    keypoints = np.frombuffer(raw_data, dtype=np.float32).reshape(n_rows, n_columns)
    return keypoints[:, :2]  # Assuming first two columns are x, y coordinates




def process_matches(matches, keypoints):
    points3Ds = {}
    featurePoints = {}
    next_point3D_id = 1

    for pair_id, match_data in matches.items():
        id1, id2 = pair_id_to_image_ids(pair_id)
        for idxInA, idxInB in match_data:
            keyA = (id1, idxInA)
            keyB = (id2, idxInB)

            if keyA not in featurePoints:
                coordsA = keypoints[id1][idxInA]
                featurePoints[keyA] = FeaturePoint(coordsA[0], coordsA[1], -1)
            if keyB not in featurePoints:
                coordsB = keypoints[id2][idxInB]
                featurePoints[keyB] = FeaturePoint(coordsB[0], coordsB[1], -1)

            fpA = featurePoints[keyA]
            fpB = featurePoints[keyB]

            if fpA.point3D_id == -1 and fpB.point3D_id == -1:
                newPoint3D = Point3D()
                point3D_id = next_point3D_id
                next_point3D_id += 1
                newPoint3D.add_observation(Observation(id1, idxInA))
                newPoint3D.add_observation(Observation(id2, idxInB))
                points3Ds[point3D_id] = newPoint3D
            else:
                point3D_id = fpA.point3D_id if fpA.point3D_id != -1 else fpB.point3D_id
                point3D = points3Ds[point3D_id]
                if fpA.point3D_id == -1:
                    point3D.add_observation(Observation(id1, idxInA))
                if fpB.point3D_id == -1:
                    point3D.add_observation(Observation(id2, idxInB))

            fpA.point3D_id = point3D_id
            fpB.point3D_id = point3D_id

    return featurePoints,points3Ds

def save_images_txt(img_ids_to_names_dict, keypoints,featurePoints, output_path):
    with open(os.path.join(output_path, 'images.txt'), 'w') as f:
        f.write(f"# Image list with two lines of data per image:\n")
        f.write(f"#   IMAGE_ID, QW, QX, QY, QZ, TX, TY, TZ, CAMERA_ID, NAME\n")
        f.write(f"#   POINTS2D[] as (X, Y, POINT3D_ID)\n")
        f.write(f"# Number of images: {len(img_ids_to_names_dict)}, mean observations per image: \n")

        for image_id in img_ids_to_names_dict:
            # 写入图像信息,假设旋转和平移为0,相机 ID 为 1
            f.write(f"{image_id} 0 0 0 0 0 0 0 1 {img_ids_to_names_dict[image_id]}\n")
            for idx, (x, y) in enumerate(keypoints[image_id]):
            # 写入该图像中的所有二维点
            # 检查该特征点是否有匹配的三维点
                point3D_id = featurePoints.get((image_id, idx), -1)
                if point3D_id != -1:
                    point3D_id = featurePoints[(image_id, idx)].point3D_id
                else:
                    point3D_id = -1  # 如果没有匹配的三维点,则为 -1
                f.write(f"{x} {y} {point3D_id} ")
            f.write("\n")


def preload_images(image_path, img_ids_to_names_dict):
    images = {}
    for image_id, img_name in img_ids_to_names_dict.items():
        img_file_name = os.path.join(image_path, img_name)
        images[image_id] = load_image(img_file_name)
    return images


def save_points3Ds_to_file(image_path,img_ids_to_names_dict,points3Ds,keypoints, output_path):
    images = preload_images(image_path, img_ids_to_names_dict)

    with open(os.path.join(output_path, 'points3D.txt'), 'w') as f:
        f.write(f"# 3D point list with one line of data per point:\n")
        f.write(f"#   POINT3D_ID, X, Y, Z, R, G, B, ERROR, TRACK[] as (IMAGE_ID, POINT2D_IDX)\n")
        f.write(f"# Number of points: {len(points3Ds)}, mean track length: \n")
        for point3D_id, point in points3Ds.items():
            ##三维点颜色
            colors = []
            for obs in point.observations:
                image = images[obs.image_id]  # 直接使用预加载的影像
                coords = keypoints[obs.image_id][obs.point2D_idx]
                color = get_color_from_image(image, coords[0], coords[1])
                colors.append(color)
            point.color = average_color(colors)
            # 写入三维点坐标和颜色
            f.write(f"{point3D_id} {point.x} {point.y} {point.z} {int(point.color[0])} {int(point.color[1])} {int(point.color[2])} 0 ")
            # 写入观测信息
            for obs in point.observations:
                f.write(f"{int(obs.image_id)} {obs.point2D_idx} ")
            f.write("\n")


def load_image(image_path):
    # 使用 OpenCV 加载图像
    return cv2.imread(image_path, cv2.IMREAD_COLOR)

def get_color_from_image(image, x, y):
    # OpenCV 中图像的坐标顺序是 (y, x)
    # 并且颜色顺序是 BGR 而不是 RGB
    x, y = int(round(x)), int(round(y))
    if x < 0 or y < 0 or x >= image.shape[1] or y >= image.shape[0]:
        return None  # 或返回默认颜色

    # OpenCV 使用 BGR 格式,需要转换为 RGB
    b, g, r = image[y, x]
    return r, g, b

def average_color(colors):
    # 计算颜色列表的平均颜色
    if not colors:
        return None
    avg_color = [sum(col) / len(colors) for col in zip(*colors)]
    return tuple(avg_color)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--database_path', required=True, help='Path to the database')
    parser.add_argument('--output_path', required=True, help='Name of the output directory')
    parser.add_argument('--image_path', required=True)
    args = parser.parse_args()

    filename_db = args.database_path

    print("Opening database: " + filename_db)
    if not os.path.exists(filename_db):
        print('Error db does not exist!')
        exit()

    if not os.path.exists(args.output_path):
        os.mkdir(args.output_path)

    # Connect to the database
    connection = sqlite3.connect(args.database_path)
    cursor = connection.cursor()


    list_image_ids = []
    img_ids_to_names_dict = {}
    # Extract image ids and keypoints
    cursor.execute('SELECT image_id, name, cameras.width, cameras.height FROM images LEFT JOIN cameras ON images.camera_id == cameras.camera_id;')
    for row in cursor:
        image_idx, name, width, height = row        
        list_image_ids.append(image_idx)
        img_ids_to_names_dict[image_idx] = name
    
    num_image_ids = len(list_image_ids)

    keypoints = {image_id: get_keypoints(cursor, image_id) for image_id in list_image_ids}

    # Extract matches
    cursor.execute('SELECT pair_id, rows, cols, data FROM two_view_geometries;')
    all_matches = {}
    for row in cursor:
        pair_id = row[0]
        rows = row[1]
        cols = row[2]
        raw_data = row[3]
        if (rows < 5):
            continue

        matches = np.frombuffer(raw_data, dtype=np.uint32).reshape(rows, cols)
        all_matches[pair_id] = matches

    # Process matches
    featurePoints,points3Ds = process_matches(all_matches, keypoints)
    
    cursor.close()
    connection.close()

    save_images_txt(img_ids_to_names_dict,keypoints, featurePoints, args.output_path)
    save_points3Ds_to_file(args.image_path,img_ids_to_names_dict,points3Ds, keypoints,args.output_path)


if __name__ == "__main__":
    main()

这时候仅有匹配结果,所以稀疏点的三维坐标都是0,影像的外参也都是0.
后面就可以自己用这个匹配去做全局的BA。

文章来源:https://blog.csdn.net/weixin_44153180/article/details/135232781
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。