from __future__ import annotations
import contextlib
import functools
import shutil
from pathlib import Path
from typing import Any, cast
import numpy as np
@contextlib.contextmanager
def _ply_stream(path: Path, binary: bool = False):
"""Context manager for streaming PLY data to a file."""
total_points = 0
mode = "wb" if binary else "w"
with open(path, mode) as f:
if binary:
f.write(b"ply\nformat binary_little_endian 1.0\n")
count_pos = f.tell()
f.write(f"element vertex {' ' * 15}\n".encode())
f.write(b"property float x\nproperty float y\nproperty float z\n")
f.write(b"property uchar red\nproperty uchar green\nproperty uchar blue\n")
f.write(b"end_header\n")
else:
f.write("ply\nformat ascii 1.0\n")
count_pos = f.tell()
# element vertex {placeholder} - 15 spaces allows for up to 999 trillion points
f.write(f"element vertex {' ' * 15}\n")
f.write("property float x\nproperty float y\nproperty float z\n")
f.write("property uchar red\nproperty uchar green\nproperty uchar blue\n")
f.write("end_header\n")
def write_points(points: np.ndarray, colors: np.ndarray):
nonlocal total_points
if binary:
num = len(points)
dtype: np.dtype = np.dtype([("pos", "f4", (3,)), ("color", "u1", (3,))])
data: np.ndarray = np.empty(num, dtype=dtype)
data["pos"] = points.astype(np.float32)
data["color"] = colors[..., :3].astype(np.uint8)
f.write(data.tobytes())
else:
for (px, py, pz), (r, g, b, *_) in zip(points, colors):
f.write(f"{px} {py} {pz} {int(r)} {int(g)} {int(b)}\n")
total_points += len(points)
yield write_points
if total_points > 0:
f.seek(count_pos)
msg = f"element vertex {total_points:<15d}"
if binary:
f.write(msg.encode())
else:
f.write(msg)
else:
path.unlink(missing_ok=True)
[docs]
def convert(
input_dir: Path,
output_dir: Path | None = None,
force: bool = False,
) -> None:
"""Convert a ``.db`` database to a ``.json`` or vice-versa.
Args:
input_dir: directory in which to look for dataset
output_dir: directory in which to save new dataset.
If not set, save new metadata file in same directory,
otherwise copy over all data to a new directory.
force: if true, overwrite output file(s) if present
"""
from visionsim.dataset import Metadata
if output_dir:
if input_dir.resolve() == output_dir.resolve():
raise RuntimeError("Input and output directory cannot be the same!")
if output_dir.exists() and not force:
raise FileExistsError("Output directory already exists.")
else:
shutil.rmtree(output_dir, ignore_errors=True)
meta = Metadata.from_path(input_dir)
assert meta.path is not None
rel_path = meta.path.relative_to(input_dir.resolve())
meta_path = rel_path.with_suffix(".db" if meta.path.suffix == ".json" else ".json")
if output_dir:
shutil.copytree(input_dir, output_dir)
meta.save(output_dir / meta_path)
(output_dir / rel_path).unlink(missing_ok=True)
else:
meta.save(input_dir / meta_path)
[docs]
def merge(input_files: list[Path], names: list[str] | None = None, output_file: Path = Path("combined.json")) -> None:
"""Merge one or more dataset files.
Typically there will be dataset file per data type (frames, depth, etc) but these can be combined if they
are compatible (same number of frames, same camera, etc) to yield Nerfstudio-compatible "transform.json"
files that might have "depth_file_path" or "mask_path" in addition to a "file_path". This does not touch the
underlying data, only modifies the transforms files.
This can be used to rename a data type for a single file, merge multiple metadata files that already have distinct
data type names, or merge and rename many metadata files altogether.
Args:
input_files (list[Path]): List of datasets to merge, can either be the path of a metadata file or it's directory.
names (list[str] | None, optional): What to rename each "path" argument to. Defaults to None.
output_file (Path, optional): Where to save metadata file, should be a ``.json`` file. Defaults to "combined.json".
"""
import more_itertools as mitertools
from visionsim.dataset import Metadata
metas = [Metadata.from_path(p) for p in input_files]
data_types = [dt for m in metas for dt in m.data_types]
if set(len(m.cameras or []) for m in metas) != {1}:
raise ValueError("Cannot merge datasets that have multiple cameras.")
if len(set(cam.model_copy(update=dict(c=None)) for m in metas for cam in (m.cameras or []))) != 1:
# Allow merge if data types have different number of channels
raise ValueError("Cannot merge datasets that have different cameras.")
if len(set(len(m) for m in metas)) != 1:
raise ValueError("Datasets cannot be merged as they are not the same size.")
if names is None:
if len(set(data_types)) != len(data_types):
raise ValueError(f"Data types must be unique, got {data_types}.")
# Disable renaming of data types since they are unique and there might be many per metadata file
data_types = [None] * len(input_files) # type: ignore
names = [None] * len(input_files) # type: ignore
else:
if len(names) != len(input_files):
raise ValueError(
f"Expected as many names as input files, got {len(names)} and {len(input_files)} respectively."
)
if len(input_files) != len(data_types):
raise ValueError("Cannot rename data types when multiple types exist in a file.")
def merge_transform(transforms):
# We already checked that cameras are the same, datasets are the same length, and (renamed) data
# types are unique, so we check for poses and per-frame args then just merge dicts
if any(
not np.allclose(a["transform_matrix"], b["transform_matrix"]) for a, b in mitertools.pairwise(transforms)
):
raise ValueError("Frames in datasets do not share a common pose.")
if any(t.get("offset") or t.get("bitpack_dim") for t in transforms):
raise ValueError("Cannot merge datasets that use additional per-frame attributes.")
return functools.reduce(lambda a, b: a | b, transforms)
merged_transforms = [
merge_transform(transforms)
for transforms in zip(
*[
m.iter_dense_transforms(data_type=dt, rename_to=name, relative_to=output_file.resolve().parent)
for m, dt, name in zip(metas, data_types, names)
]
)
]
Metadata.from_dense_transforms(merged_transforms).save(output_file)
[docs]
def to_pointcloud(
colors: Path,
depths: Path | None = None,
points: Path | None = None,
output: Path = Path("pointcloud.ply"),
p: float = 0.15,
binary: bool = True,
force: bool = False,
) -> None:
"""Generate a ``.ply`` point cloud from datasets.
Args:
colors: path to dataset to use for point colors, must contain RGB data that is assumed to be in uint8.
depths: path to dataset to use for depth-based 3D points. A pinhole camera model is used to project
depth values to 3D points.
points: path to dataset to use for world-space 3D points. If set, this will be used
instead of depth-based points.
output: path to save PLY file to.
p: probability of sampling a pixel.
binary: If true, save as a binary PLY file (smaller and faster).
force: If true, overwrite output file if present.
"""
from rich.progress import track
from visionsim.cli import _log
from visionsim.dataset import Dataset
if output.exists() and not force:
raise FileExistsError("Output file already exists.")
else:
output.unlink(missing_ok=True)
# Filter out large depths, this is a render bug in CYCLES
# See: https://blender.stackexchange.com/questions/325007
DEPTH_CUTOFF = 10000000000
np.random.seed(76986489)
ds_colors = Dataset.from_path(colors)
num_frames = len(ds_colors)
ds_points = Dataset.from_path(points) if points else None
ds_depths = Dataset.from_path(depths) if depths else None
if (ds_points and len(ds_points) != num_frames) or (ds_depths and len(ds_depths) != num_frames):
raise ValueError("Colors and depths/points datasets must have the same number of frames.")
canonical_cameras = [
[cam.model_copy(update=dict(c=None)) for cam in (m.cameras or [])]
for m in [ds_colors, ds_points, ds_depths]
if m is not None
]
if any(c1 != c2 for c1, c2 in zip(*canonical_cameras)):
raise ValueError("Cannot generate point cloud from datasets with different cameras.")
if output.suffix != ".ply":
raise ValueError(f"Output file must have a .ply extension, instead got {output.suffix}.")
_log.info(f"Generating point cloud from {num_frames} frames (p={p})...")
with _ply_stream(output, binary=binary) as write_points:
for i in track(range(num_frames), description="Processing frames..."):
colors_data, _ = cast(tuple[np.ndarray, dict[str, Any]], ds_colors[i])
if ds_points is not None:
points_data, points_meta = cast(tuple[np.ndarray, dict[str, Any]], ds_points[i])
elif ds_depths is not None:
points_data, points_meta = cast(tuple[np.ndarray, dict[str, Any]], ds_depths[i])
else:
raise ValueError("Either `points` or `depths` must be provided.")
if points_data.shape[:2] != colors_data.shape[:2]:
_log.warning(
f"Points and colors must have same resolution for frame {i}, "
f"got {points_data.shape[:2]} and {colors_data.shape[:2]}."
)
continue
h, w = points_data.shape[:2]
mask = np.random.rand(h, w) < p
points_sampled = points_data[mask]
colors_sampled = colors_data[mask]
if ds_depths and ds_points is None:
# Project depth to world-space
depth = points_sampled[..., 0]
# Filter out zero depth
valid_mask = (depth > 0) & (depth < DEPTH_CUTOFF)
depth = depth[valid_mask]
colors_sampled = colors_sampled[valid_mask]
# Get coordinates for valid mask
yy, xx = np.where(mask)
yy = yy[valid_mask]
xx = xx[valid_mask]
# Intrinsics
fl_x = points_meta["fl_x"]
fl_y = points_meta["fl_y"]
cx = points_meta["cx"]
cy = points_meta["cy"]
# Camera space points (Blender/OpenGL: +X right, +Y up, -Z forward)
# So depth is -Z. Y is inverted between image and camera space (up is positive)
z = -depth
x = -(xx - cx) * z / fl_x
y = (yy - cy) * z / fl_y
# World space
c2w = np.array(points_meta["transform_matrix"])
points_cam = np.stack([x, y, z, np.ones_like(x)], axis=-1)
points_world = (c2w @ points_cam.T).T
points_world = points_world[:, :3] / points_world[:, -1:]
else:
# Assume points are already in world-space
points_world = points_sampled
# Filter out zero points (often background in position pass if not careful)
points_x, points_y, points_z = points_world.T
valid_mask = (points_x != 0) | (points_y != 0) | (points_z != 0)
points_world = points_world[valid_mask]
colors_sampled = colors_sampled[valid_mask]
# Write points for this frame to file
write_points(points_world, colors_sampled)