impak
impak – space-efficient image collection format based on pixel-diff patches.
Quick start
Pack a folder of PNG variants into a single file:
import impak
with impak.create("variants.impak", mode="vs_first") as w:
for path in sorted(Path("frames/").glob("*.png")):
w.add(path, name=path.stem)
Read them back:
with impak.open("variants.impak") as r:
print(r.info()) # human-readable summary
img = r[0] # PIL Image, by index
img = r["frame_01"] # by name
for img in r: # iterate all
img.show()
Diff modes
| Mode | Description |
|---|---|
lto (default) |
Each frame searches all prior frames and picks the best reference (smallest patch payload), forming a DAG of deltas. A keyframe is stored when it's smaller than the best delta. Params: lto_candidates (default 6), max_ref_depth (default 8). |
vs_first |
Every frame is a patch against frame 0. |
vs_prior |
Every frame is a patch against the immediately prior frame. |
keyframe |
Like vs_prior but stores a full image every K frames. Param: keyframe_interval (default 10). |
manual |
User-designated baselines are stored first and searched before each content frame. Params: baselines (required), fallback_mode (default vs_prior). |
Example:
with impak.create(
"out.impak",
mode="manual",
baselines=["day_bg.png", "night_bg.png"],
fallback_mode="vs_prior",
) as w:
for path in content_frames:
w.add(path)
1""" 2impak – space-efficient image collection format based on pixel-diff patches. 3 4Quick start 5----------- 6Pack a folder of PNG variants into a single file: 7 8 import impak 9 10 with impak.create("variants.impak", mode="vs_first") as w: 11 for path in sorted(Path("frames/").glob("*.png")): 12 w.add(path, name=path.stem) 13 14Read them back: 15 16 with impak.open("variants.impak") as r: 17 print(r.info()) # human-readable summary 18 img = r[0] # PIL Image, by index 19 img = r["frame_01"] # by name 20 for img in r: # iterate all 21 img.show() 22 23Diff modes 24---------- 25| Mode | Description | 26|------|-------------| 27| ``lto`` *(default)* | Each frame searches all prior frames and picks the best reference (smallest patch payload), forming a DAG of deltas. A keyframe is stored when it's smaller than the best delta. Params: ``lto_candidates`` (default 6), ``max_ref_depth`` (default 8). | 28| ``vs_first`` | Every frame is a patch against frame 0. | 29| ``vs_prior`` | Every frame is a patch against the immediately prior frame. | 30| ``keyframe`` | Like ``vs_prior`` but stores a full image every K frames. Param: ``keyframe_interval`` (default 10). | 31| ``manual`` | User-designated baselines are stored first and searched before each content frame. Params: ``baselines`` (required), ``fallback_mode`` (default ``vs_prior``). | 32 33 Example: 34 35 with impak.create( 36 "out.impak", 37 mode="manual", 38 baselines=["day_bg.png", "night_bg.png"], 39 fallback_mode="vs_prior", 40 ) as w: 41 for path in content_frames: 42 w.add(path) 43 44""" 45 46from .encoder import ImpakWriter 47from .decoder import ImpakReader 48from .differ import compute_patches, reconstruct, similarity_score 49 50 51def create(path, mode="lto", **kwargs) -> ImpakWriter: 52 """ 53 Parameters 54 ---------- 55 path : file path (str or Path) 56 mode : "vs_first" | "vs_prior" | "keyframe" | "lto" | "manual" 57 codec : "png" or "webp" (default) 58 quality : 0-100. 100 = lossless for both codecs. 59 Values <100 enable lossy WebP (smaller, not pixel-perfect). 60 keyframe_interval : (keyframe mode) full image every N frames (default 10) 61 threshold : pixel delta threshold 0-255 (default 4) 62 tile_size : diff grid tile size in px (default 32) 63 merge_gap : merge adjacent changed tiles within N px (default 8) 64 auto_keyframe_sim : force keyframe when similarity drops below this (default 0.5) 65 lto_candidates : (lto / manual mode) how many top-similarity frames to 66 fully probe (default 6) 67 max_ref_depth : (lto mode) max decode-chain depth before forcing a keyframe 68 (default 8) 69 baselines : (manual mode, required) list of PIL Images or file paths 70 stored as pinned keyframes at the start of the file; 71 every content frame is diffed against these first. 72 fallback_mode : (manual mode) strategy used when no baseline wins; 73 one of "vs_first", "vs_prior", "keyframe", "lto" 74 (default "vs_prior") 75 """ 76 return ImpakWriter(path, mode=mode, **kwargs) 77 78 79def open(path, low_ram_mode=False, cache_size=None, **kwargs) -> ImpakReader: 80 return ImpakReader( 81 path, 82 low_ram_mode=low_ram_mode, 83 cache_size=cache_size, 84 **kwargs, 85 ) 86 87 88__all__ = [ 89 "create", 90 "open", 91 "ImpakWriter", 92 "ImpakReader", 93 "compute_patches", 94 "reconstruct", 95 "similarity_score", 96] 97 98__version__ = "0.1.3"
52def create(path, mode="lto", **kwargs) -> ImpakWriter: 53 """ 54 Parameters 55 ---------- 56 path : file path (str or Path) 57 mode : "vs_first" | "vs_prior" | "keyframe" | "lto" | "manual" 58 codec : "png" or "webp" (default) 59 quality : 0-100. 100 = lossless for both codecs. 60 Values <100 enable lossy WebP (smaller, not pixel-perfect). 61 keyframe_interval : (keyframe mode) full image every N frames (default 10) 62 threshold : pixel delta threshold 0-255 (default 4) 63 tile_size : diff grid tile size in px (default 32) 64 merge_gap : merge adjacent changed tiles within N px (default 8) 65 auto_keyframe_sim : force keyframe when similarity drops below this (default 0.5) 66 lto_candidates : (lto / manual mode) how many top-similarity frames to 67 fully probe (default 6) 68 max_ref_depth : (lto mode) max decode-chain depth before forcing a keyframe 69 (default 8) 70 baselines : (manual mode, required) list of PIL Images or file paths 71 stored as pinned keyframes at the start of the file; 72 every content frame is diffed against these first. 73 fallback_mode : (manual mode) strategy used when no baseline wins; 74 one of "vs_first", "vs_prior", "keyframe", "lto" 75 (default "vs_prior") 76 """ 77 return ImpakWriter(path, mode=mode, **kwargs)
Parameters
path (file path (str or Path)):
mode ("vs_first" | "vs_prior" | "keyframe" | "lto" | "manual"):
codec ("png" or "webp" (default)):
quality (0-100. 100 = lossless for both codecs.): Values <100 enable lossy WebP (smaller, not pixel-perfect).
keyframe_interval ((keyframe mode) full image every N frames (default 10)):
threshold (pixel delta threshold 0-255 (default 4)):
tile_size (diff grid tile size in px (default 32)):
merge_gap (merge adjacent changed tiles within N px (default 8)):
auto_keyframe_sim (force keyframe when similarity drops below this (default 0.5)):
lto_candidates ((lto / manual mode) how many top-similarity frames to): fully probe (default 6)
- max_ref_depth ((lto mode) max decode-chain depth before forcing a keyframe): (default 8)
- baselines ((manual mode, required) list of PIL Images or file paths): stored as pinned keyframes at the start of the file; every content frame is diffed against these first.
- fallback_mode ((manual mode) strategy used when no baseline wins;): one of "vs_first", "vs_prior", "keyframe", "lto" (default "vs_prior")
63class ImpakWriter: 64 """ 65 Sequential writer for .impak collections. 66 67 Parameters 68 ---------- 69 path : output file path 70 mode : "vs_first" | "vs_prior" | "keyframe" | "lto" | "manual" 71 keyframe_interval : (keyframe mode only) store a full image every N frames 72 threshold : pixel delta considered "changed" (0 = perfectly lossless) 73 tile_size : diff grid cell size in pixels 74 merge_gap : pixels gap between changed tiles before they are merged 75 auto_keyframe_sim : if similarity drops below this fraction a keyframe is 76 forced regardless of mode (0.0 = never force) 77 workers : thread-pool size used for parallel patch compression and 78 LTO candidate probing. None = os.cpu_count(). 79 Set to 1 to disable parallelism entirely. 80 baselines : (manual mode) list of PIL Images or file paths used as 81 pinned reference anchors. They are stored as hidden 82 leading keyframes so the delta chain is self-contained, 83 but ImpakReader hides them from callers — only content 84 frames are visible when iterating or indexing. 85 Baselines are automatically resized to match the canvas 86 size established by the first content frame (or the 87 first baseline if all are the same size). 88 fallback_mode : (manual mode) diff strategy applied when no baseline 89 yields a patch set smaller than this alternative. 90 Accepts "vs_first", "vs_prior", "keyframe", or "lto". 91 Defaults to "lto". 92 """ 93 94 def __init__( 95 self, 96 path: Union[str, Path], 97 mode: str = "vs_first", 98 keyframe_interval: int = 10, 99 threshold: int = 4, 100 tile_size: int = 64, 101 merge_gap: int = 8, 102 auto_keyframe_sim: float = 0.5, 103 codec: str = "webp", 104 quality: int = 100, 105 lto_candidates: int = 6, 106 max_ref_depth: int = 8, 107 workers: Optional[int] = None, 108 baselines: Optional[List[Union[Image.Image, str, Path]]] = None, 109 fallback_mode: str = "lto", 110 ): 111 if mode not in MODE_FROM_NAME: 112 raise ValueError(f"mode must be one of {list(MODE_FROM_NAME)}") 113 if codec not in ("png", "webp"): 114 raise ValueError("codec must be 'png' or 'webp'") 115 if not (0 <= quality <= 100): 116 raise ValueError("quality must be 0-100") 117 if lto_candidates < 1: 118 raise ValueError("lto_candidates must be >= 1") 119 if max_ref_depth < 1: 120 raise ValueError("max_ref_depth must be >= 1") 121 122 if mode == "manual": 123 _valid_fallbacks = [m for m in MODE_FROM_NAME if m != "manual"] 124 if fallback_mode not in _valid_fallbacks: 125 raise ValueError( 126 f"fallback_mode must be one of {_valid_fallbacks}" 127 ) 128 if not baselines: 129 raise ValueError( 130 "manual mode requires at least one entry in 'baselines'" 131 ) 132 133 self.path = Path(path) 134 self.mode = mode 135 self.mode_id = MODE_FROM_NAME[mode] 136 self.keyframe_interval = keyframe_interval 137 self.threshold = threshold 138 self.tile_size = tile_size 139 self.merge_gap = merge_gap 140 self.auto_keyframe_sim = auto_keyframe_sim 141 self.codec = codec 142 self.quality = quality 143 self.lto_candidates = lto_candidates 144 self.max_ref_depth = max_ref_depth 145 self.workers = workers 146 self.fallback_mode = fallback_mode 147 self.fallback_mode_id = MODE_FROM_NAME.get(fallback_mode, MODE_LTO) 148 149 self._frames: list[dict] = [] 150 self._frame_data: list[bytes] = [] 151 self._ref_images: list[Image.Image] = [] 152 self._ref_arrays: list[np.ndarray] = [] 153 self._depths: list[int] = [] 154 155 156 self._canvas: Optional[tuple[int, int]] = None 157 self._size_groups: dict[tuple[int, int], list[int]] = {} 158 self._closed = False 159 160 self._baseline_ids: list[int] = [] 161 self._baseline_count: int = 0 162 self._pending_baselines: list = list(baselines) if baselines else [] 163 164 _pool_size = workers if workers is not None else (os.cpu_count() or 4) 165 self._pool = ThreadPoolExecutor(max_workers=max(1, _pool_size)) 166 167 def add( 168 self, 169 image: Union[Image.Image, str, Path], 170 name: Optional[str] = None, 171 metadata: Optional[dict] = None, 172 ) -> int: 173 """ 174 Add one content image to the collection. Returns the 0-based frame 175 index as seen by readers (i.e. not counting hidden baseline frames). 176 177 *image* may be a Pillow Image or a file path. 178 *name* is stored in per-frame metadata (useful for later retrieval). 179 *metadata* is an arbitrary JSON-serialisable dict merged with name. 180 """ 181 if self._closed: 182 raise RuntimeError("Writer is already closed") 183 184 if isinstance(image, (str, Path)): 185 image = Image.open(image) 186 187 image = image.convert("RGBA") 188 new_arr = np.array(image, dtype=np.int16) 189 190 if self._canvas is None: 191 self._canvas = (image.width, image.height) 192 193 if self._pending_baselines: 194 self._inject_baselines(self._pending_baselines) 195 self._pending_baselines = [] 196 197 frame_id = len(self._frames) 198 199 frame_type, ref_id, patches = self._encode_frame(image, frame_id, new_arr) 200 201 meta: dict = {} 202 if name: 203 meta["name"] = name 204 if metadata: 205 meta.update(metadata) 206 207 meta["_size"] = [image.width, image.height] 208 meta_bytes = json.dumps(meta, separators=(",", ":")).encode() if meta else b"" 209 210 parts: list[bytes] = [] 211 for (x, y, w, h, data) in patches: 212 parts.append(pack_patch_header(x, y, w, h, len(data))) 213 parts.append(data) 214 parts.append(meta_bytes) 215 frame_bytes = b"".join(parts) 216 217 if frame_type == FRAME_KEYFRAME: 218 self._depths.append(0) 219 else: 220 self._depths.append(self._depths[ref_id] + 1) 221 222 self._frames.append({ 223 "patch_count": len(patches), 224 "ref_frame_id": ref_id, 225 "metadata_len": len(meta_bytes), 226 "frame_type": frame_type, 227 }) 228 self._frame_data.append(frame_bytes) 229 self._ref_images.append(image) 230 self._ref_arrays.append(new_arr) 231 size_key = (image.width, image.height) 232 self._size_groups.setdefault(size_key, []).append(frame_id) 233 234 return frame_id - self._baseline_count 235 236 def close(self): 237 """Finalise and write the file. Called automatically by __exit__.""" 238 if self._closed: 239 return 240 if not self._frames: 241 raise RuntimeError("No frames added — nothing to write") 242 243 self._pool.shutdown(wait=False) 244 245 w, h = self._canvas 246 frame_count = len(self._frames) 247 248 index_offset = FILE_HEADER_SIZE 249 data_start = index_offset + frame_count * FRAME_INDEX_ENTRY_SIZE 250 offsets: list[int] = [] 251 cursor = data_start 252 for fd in self._frame_data: 253 offsets.append(cursor) 254 cursor += len(fd) 255 256 self.path.parent.mkdir(parents=True, exist_ok=True) 257 258 with open(self.path, "wb") as f: 259 f.write(pack_file_header( 260 self.mode_id, frame_count, index_offset, w, h, 261 codec=CODEC_FROM_NAME[self.codec], 262 quality=self.quality, 263 )) 264 for i, fm in enumerate(self._frames): 265 f.write(pack_index_entry( 266 data_offset=offsets[i], 267 patch_count=fm["patch_count"], 268 ref_frame_id=fm["ref_frame_id"], 269 metadata_len=fm["metadata_len"], 270 frame_type=fm["frame_type"], 271 )) 272 for fd in self._frame_data: 273 f.write(fd) 274 275 self._closed = True 276 277 def __enter__(self): 278 return self 279 280 def __exit__(self, exc_type, exc_val, exc_tb): 281 if exc_type is None: 282 self.close() 283 else: 284 self._pool.shutdown(wait=False) 285 self._closed = True 286 return False 287 288 @property 289 def frame_count(self) -> int: 290 """Total frames including hidden baselines.""" 291 return len(self._frames) 292 293 @property 294 def content_frame_count(self) -> int: 295 """Content frames only (what callers see).""" 296 return len(self._frames) - self._baseline_count 297 298 @property 299 def baseline_count(self) -> int: 300 return self._baseline_count 301 302 @property 303 def stats(self) -> list[dict]: 304 """Per-frame stats including hidden baseline frames.""" 305 result = [] 306 for i, (fm, fd) in enumerate(zip(self._frames, self._frame_data)): 307 is_baseline = i in self._baseline_ids 308 result.append({ 309 "frame_id": i, 310 "content_id": None if is_baseline else i - self._baseline_count, 311 "frame_type": "keyframe" if fm["frame_type"] == FRAME_KEYFRAME else "delta", 312 "ref_frame_id": fm["ref_frame_id"], 313 "patch_count": fm["patch_count"], 314 "data_bytes": len(fd), 315 "is_baseline": is_baseline, 316 }) 317 return result 318 319 def _inject_baselines(self, baselines: list): 320 """ 321 Store each baseline as a hidden leading FRAME_KEYFRAME. 322 323 Called from the first add() so self._canvas is already set. 324 Baselines are resized to the canvas if their native size differs. 325 """ 326 for idx, raw in enumerate(baselines): 327 if isinstance(raw, (str, Path)): 328 img = Image.open(raw).convert("RGBA") 329 else: 330 img = raw.convert("RGBA") 331 332 # LANCZOS? not sure.. 333 if img.size != self._canvas: 334 img = img.resize(self._canvas, Image.LANCZOS) 335 336 frame_id = len(self._frames) 337 cw, ch = self._canvas 338 compressed = _encode_crop(img, 0, 0, cw, ch, codec=self.codec, quality=self.quality) 339 340 meta_bytes = json.dumps( 341 {"_baseline": True, "baseline_index": idx}, 342 separators=(",", ":"), 343 ).encode() 344 345 parts: list[bytes] = [ 346 pack_patch_header(0, 0, cw, ch, len(compressed)), 347 compressed, 348 meta_bytes, 349 ] 350 frame_bytes = b"".join(parts) 351 352 self._depths.append(0) 353 self._frames.append({ 354 "patch_count": 1, 355 "ref_frame_id": frame_id, 356 "metadata_len": len(meta_bytes), 357 "frame_type": FRAME_KEYFRAME, 358 }) 359 self._frame_data.append(frame_bytes) 360 self._ref_images.append(img) 361 self._ref_arrays.append(np.array(img, dtype=np.int16)) 362 self._baseline_ids.append(frame_id) 363 364 self._baseline_count = len(self._baseline_ids) 365 366 def _encode_frame(self, image: Image.Image, frame_id: int, new_arr: np.ndarray): 367 """Route to the appropriate encoder. Returns (frame_type, ref_id, patches).""" 368 size_key = (image.width, image.height) 369 same_size_ids = [fid for fid in self._size_groups.get(size_key, []) 370 if fid < frame_id] 371 if not same_size_ids and frame_id > 0: 372 return self._make_keyframe(image, frame_id) 373 374 if self.mode_id == MODE_MANUAL: 375 return self._encode_frame_manual(image, frame_id, new_arr) 376 377 if frame_id == 0: 378 return self._make_keyframe(image, frame_id) 379 380 if self.mode_id == MODE_LTO: 381 return self._encode_frame_lto(image, frame_id, new_arr, same_size_ids) 382 383 if self.mode_id == MODE_KEYFRAME and (frame_id % self.keyframe_interval == 0): 384 return self._make_keyframe(image, frame_id) 385 386 if self.mode_id == MODE_VS_FIRST: 387 ref_id = same_size_ids[0] 388 elif self.mode_id == MODE_VS_PRIOR: 389 ref_id = same_size_ids[-1] 390 else: 391 ref_id = same_size_ids[-1] 392 393 return self._diff_against(image, frame_id, ref_id, new_arr) 394 395 def _ref_chain_depth(self, frame_id: int) -> int: 396 return self._depths[frame_id] 397 398 def _make_keyframe(self, image: Image.Image, frame_id: int): 399 w, h = image.size 400 compressed = _encode_crop(image, 0, 0, w, h, codec=self.codec, quality=self.quality) 401 return FRAME_KEYFRAME, frame_id, [(0, 0, w, h, compressed)] 402 403 def _diff_against(self, image: Image.Image, frame_id: int, ref_id: int, new_arr: np.ndarray): 404 if self.auto_keyframe_sim > 0: 405 ref_arr = self._ref_arrays[ref_id] 406 diff_arr = np.abs(new_arr - ref_arr).max(axis=2) 407 total = ref_arr.shape[0] * ref_arr.shape[1] 408 if (diff_arr <= self.threshold).sum() / total < self.auto_keyframe_sim: 409 return self._make_keyframe(image, frame_id) 410 411 patches = compute_patches( 412 self._ref_images[ref_id], image, 413 threshold=self.threshold, 414 tile_size=self.tile_size, 415 merge_gap=self.merge_gap, 416 codec=self.codec, 417 quality=self.quality, 418 workers=self.workers, 419 ref_arr=self._ref_arrays[ref_id], 420 new_arr=new_arr, 421 ) 422 return FRAME_DELTA, ref_id, patches 423 424 def _encode_frame_lto(self, image: Image.Image, frame_id: int, 425 new_arr: np.ndarray, 426 same_size_ids: Optional[list] = None): 427 total_px = new_arr.shape[0] * new_arr.shape[1] 428 429 if same_size_ids is None: 430 size_key = (image.width, image.height) 431 same_size_ids = [fid for fid in self._size_groups.get(size_key, []) 432 if fid < frame_id] 433 434 eligible = [ 435 cid for cid in same_size_ids 436 if self._depths[cid] + 1 <= self.max_ref_depth 437 ] 438 if not eligible: 439 return self._make_keyframe(image, frame_id) 440 441 def _score(cid: int): 442 diff = np.abs(new_arr - self._ref_arrays[cid]).max(axis=2) 443 sim = float((diff <= self.threshold).sum()) / total_px 444 return sim, cid 445 446 if len(eligible) <= 2 or self.workers == 1: 447 scored = [_score(cid) for cid in eligible] 448 else: 449 scored = list(self._pool.map(_score, eligible)) 450 451 scored.sort(key=lambda x: -x[0]) 452 candidates = [cid for (_, cid) in scored[: self.lto_candidates]] 453 454 def _probe(cid: int): 455 patches = compute_patches( 456 self._ref_images[cid], image, 457 threshold=self.threshold, 458 tile_size=self.tile_size, 459 merge_gap=self.merge_gap, 460 codec=self.codec, 461 quality=self.quality, 462 workers=1, 463 ref_arr=self._ref_arrays[cid], 464 new_arr=new_arr, 465 ) 466 return cid, patches, sum(len(p[4]) for p in patches) 467 468 if len(candidates) <= 1 or self.workers == 1: 469 results = [_probe(cid) for cid in candidates] 470 else: 471 results = list(self._pool.map(_probe, candidates)) 472 473 best_ref_id, best_patches, best_size = min(results, key=lambda r: r[2]) 474 475 iw, ih = image.size 476 kf_data = _encode_crop(image, 0, 0, iw, ih, codec=self.codec, quality=self.quality) 477 if len(kf_data) <= best_size: 478 return self._make_keyframe(image, frame_id) 479 480 return FRAME_DELTA, best_ref_id, best_patches 481 482 def _encode_frame_manual(self, image: Image.Image, frame_id: int, new_arr: np.ndarray): 483 """ 484 Manual-mode encoder: probe designated baseline frames first, then fall 485 back to the configured fallback mode if no baseline wins. 486 """ 487 total_px = new_arr.shape[0] * new_arr.shape[1] 488 489 def _score_baseline(bid: int): 490 diff = np.abs(new_arr - self._ref_arrays[bid]).max(axis=2) 491 sim = float((diff <= self.threshold).sum()) / total_px 492 return sim, bid 493 494 if len(self._baseline_ids) <= 2 or self.workers == 1: 495 scored = [_score_baseline(bid) for bid in self._baseline_ids] 496 else: 497 scored = list(self._pool.map(_score_baseline, self._baseline_ids)) 498 499 scored.sort(key=lambda x: -x[0]) 500 top_baselines = [bid for (_, bid) in scored[: self.lto_candidates]] 501 502 def _probe_baseline(bid: int): 503 patches = compute_patches( 504 self._ref_images[bid], image, 505 threshold=self.threshold, 506 tile_size=self.tile_size, 507 merge_gap=self.merge_gap, 508 codec=self.codec, 509 quality=self.quality, 510 workers=1, 511 ref_arr=self._ref_arrays[bid], 512 new_arr=new_arr, 513 ) 514 return bid, patches, sum(len(p[4]) for p in patches) 515 516 if len(top_baselines) <= 1 or self.workers == 1: 517 baseline_results = [_probe_baseline(bid) for bid in top_baselines] 518 else: 519 baseline_results = list(self._pool.map(_probe_baseline, top_baselines)) 520 521 best_bl_ref, best_bl_patches, best_bl_size = min(baseline_results, key=lambda r: r[2]) 522 523 fb_type, fb_ref_id, fb_patches = self._encode_frame_fallback(image, frame_id, new_arr) 524 fb_size = sum(len(p[4]) for p in fb_patches) 525 526 iw, ih = image.size 527 kf_size = len(_encode_crop(image, 0, 0, iw, ih, codec=self.codec, quality=self.quality)) 528 529 if kf_size <= best_bl_size and kf_size <= fb_size: 530 return self._make_keyframe(image, frame_id) 531 if best_bl_size <= fb_size: 532 return FRAME_DELTA, best_bl_ref, best_bl_patches 533 return fb_type, fb_ref_id, fb_patches 534 535 def _encode_frame_fallback(self, image: Image.Image, frame_id: int, new_arr: np.ndarray): 536 """ 537 Run the configured fallback_mode for this frame. 538 """ 539 content_id = frame_id - self._baseline_count 540 541 if self.fallback_mode_id == MODE_VS_FIRST: 542 if content_id == 0: 543 return self._make_keyframe(image, frame_id) 544 ref_id = self._baseline_count 545 return self._diff_against(image, frame_id, ref_id, new_arr) 546 547 if self.fallback_mode_id == MODE_VS_PRIOR: 548 if content_id == 0: 549 return self._make_keyframe(image, frame_id) 550 return self._diff_against(image, frame_id, frame_id - 1, new_arr) 551 552 if self.fallback_mode_id == MODE_KEYFRAME: 553 if content_id == 0 or (content_id % self.keyframe_interval == 0): 554 return self._make_keyframe(image, frame_id) 555 return self._diff_against(image, frame_id, frame_id - 1, new_arr) 556 557 if self.fallback_mode_id == MODE_LTO: 558 if content_id == 0: 559 return self._make_keyframe(image, frame_id) 560 return self._encode_frame_lto(image, frame_id, new_arr) 561 562 return self._make_keyframe(image, frame_id)
Sequential writer for .impak collections.
Parameters
path (output file path):
mode ("vs_first" | "vs_prior" | "keyframe" | "lto" | "manual"):
keyframe_interval ((keyframe mode only) store a full image every N frames):
threshold (pixel delta considered "changed" (0 = perfectly lossless)):
tile_size (diff grid cell size in pixels):
merge_gap (pixels gap between changed tiles before they are merged):
auto_keyframe_sim (if similarity drops below this fraction a keyframe is): forced regardless of mode (0.0 = never force)
- workers (thread-pool size used for parallel patch compression and): LTO candidate probing. None = os.cpu_count(). Set to 1 to disable parallelism entirely.
- baselines ((manual mode) list of PIL Images or file paths used as): pinned reference anchors. They are stored as hidden leading keyframes so the delta chain is self-contained, but ImpakReader hides them from callers — only content frames are visible when iterating or indexing. Baselines are automatically resized to match the canvas size established by the first content frame (or the first baseline if all are the same size).
- fallback_mode ((manual mode) diff strategy applied when no baseline): yields a patch set smaller than this alternative. Accepts "vs_first", "vs_prior", "keyframe", or "lto". Defaults to "lto".
94 def __init__( 95 self, 96 path: Union[str, Path], 97 mode: str = "vs_first", 98 keyframe_interval: int = 10, 99 threshold: int = 4, 100 tile_size: int = 64, 101 merge_gap: int = 8, 102 auto_keyframe_sim: float = 0.5, 103 codec: str = "webp", 104 quality: int = 100, 105 lto_candidates: int = 6, 106 max_ref_depth: int = 8, 107 workers: Optional[int] = None, 108 baselines: Optional[List[Union[Image.Image, str, Path]]] = None, 109 fallback_mode: str = "lto", 110 ): 111 if mode not in MODE_FROM_NAME: 112 raise ValueError(f"mode must be one of {list(MODE_FROM_NAME)}") 113 if codec not in ("png", "webp"): 114 raise ValueError("codec must be 'png' or 'webp'") 115 if not (0 <= quality <= 100): 116 raise ValueError("quality must be 0-100") 117 if lto_candidates < 1: 118 raise ValueError("lto_candidates must be >= 1") 119 if max_ref_depth < 1: 120 raise ValueError("max_ref_depth must be >= 1") 121 122 if mode == "manual": 123 _valid_fallbacks = [m for m in MODE_FROM_NAME if m != "manual"] 124 if fallback_mode not in _valid_fallbacks: 125 raise ValueError( 126 f"fallback_mode must be one of {_valid_fallbacks}" 127 ) 128 if not baselines: 129 raise ValueError( 130 "manual mode requires at least one entry in 'baselines'" 131 ) 132 133 self.path = Path(path) 134 self.mode = mode 135 self.mode_id = MODE_FROM_NAME[mode] 136 self.keyframe_interval = keyframe_interval 137 self.threshold = threshold 138 self.tile_size = tile_size 139 self.merge_gap = merge_gap 140 self.auto_keyframe_sim = auto_keyframe_sim 141 self.codec = codec 142 self.quality = quality 143 self.lto_candidates = lto_candidates 144 self.max_ref_depth = max_ref_depth 145 self.workers = workers 146 self.fallback_mode = fallback_mode 147 self.fallback_mode_id = MODE_FROM_NAME.get(fallback_mode, MODE_LTO) 148 149 self._frames: list[dict] = [] 150 self._frame_data: list[bytes] = [] 151 self._ref_images: list[Image.Image] = [] 152 self._ref_arrays: list[np.ndarray] = [] 153 self._depths: list[int] = [] 154 155 156 self._canvas: Optional[tuple[int, int]] = None 157 self._size_groups: dict[tuple[int, int], list[int]] = {} 158 self._closed = False 159 160 self._baseline_ids: list[int] = [] 161 self._baseline_count: int = 0 162 self._pending_baselines: list = list(baselines) if baselines else [] 163 164 _pool_size = workers if workers is not None else (os.cpu_count() or 4) 165 self._pool = ThreadPoolExecutor(max_workers=max(1, _pool_size))
167 def add( 168 self, 169 image: Union[Image.Image, str, Path], 170 name: Optional[str] = None, 171 metadata: Optional[dict] = None, 172 ) -> int: 173 """ 174 Add one content image to the collection. Returns the 0-based frame 175 index as seen by readers (i.e. not counting hidden baseline frames). 176 177 *image* may be a Pillow Image or a file path. 178 *name* is stored in per-frame metadata (useful for later retrieval). 179 *metadata* is an arbitrary JSON-serialisable dict merged with name. 180 """ 181 if self._closed: 182 raise RuntimeError("Writer is already closed") 183 184 if isinstance(image, (str, Path)): 185 image = Image.open(image) 186 187 image = image.convert("RGBA") 188 new_arr = np.array(image, dtype=np.int16) 189 190 if self._canvas is None: 191 self._canvas = (image.width, image.height) 192 193 if self._pending_baselines: 194 self._inject_baselines(self._pending_baselines) 195 self._pending_baselines = [] 196 197 frame_id = len(self._frames) 198 199 frame_type, ref_id, patches = self._encode_frame(image, frame_id, new_arr) 200 201 meta: dict = {} 202 if name: 203 meta["name"] = name 204 if metadata: 205 meta.update(metadata) 206 207 meta["_size"] = [image.width, image.height] 208 meta_bytes = json.dumps(meta, separators=(",", ":")).encode() if meta else b"" 209 210 parts: list[bytes] = [] 211 for (x, y, w, h, data) in patches: 212 parts.append(pack_patch_header(x, y, w, h, len(data))) 213 parts.append(data) 214 parts.append(meta_bytes) 215 frame_bytes = b"".join(parts) 216 217 if frame_type == FRAME_KEYFRAME: 218 self._depths.append(0) 219 else: 220 self._depths.append(self._depths[ref_id] + 1) 221 222 self._frames.append({ 223 "patch_count": len(patches), 224 "ref_frame_id": ref_id, 225 "metadata_len": len(meta_bytes), 226 "frame_type": frame_type, 227 }) 228 self._frame_data.append(frame_bytes) 229 self._ref_images.append(image) 230 self._ref_arrays.append(new_arr) 231 size_key = (image.width, image.height) 232 self._size_groups.setdefault(size_key, []).append(frame_id) 233 234 return frame_id - self._baseline_count
Add one content image to the collection. Returns the 0-based frame index as seen by readers (i.e. not counting hidden baseline frames).
image may be a Pillow Image or a file path. name is stored in per-frame metadata (useful for later retrieval). metadata is an arbitrary JSON-serialisable dict merged with name.
236 def close(self): 237 """Finalise and write the file. Called automatically by __exit__.""" 238 if self._closed: 239 return 240 if not self._frames: 241 raise RuntimeError("No frames added — nothing to write") 242 243 self._pool.shutdown(wait=False) 244 245 w, h = self._canvas 246 frame_count = len(self._frames) 247 248 index_offset = FILE_HEADER_SIZE 249 data_start = index_offset + frame_count * FRAME_INDEX_ENTRY_SIZE 250 offsets: list[int] = [] 251 cursor = data_start 252 for fd in self._frame_data: 253 offsets.append(cursor) 254 cursor += len(fd) 255 256 self.path.parent.mkdir(parents=True, exist_ok=True) 257 258 with open(self.path, "wb") as f: 259 f.write(pack_file_header( 260 self.mode_id, frame_count, index_offset, w, h, 261 codec=CODEC_FROM_NAME[self.codec], 262 quality=self.quality, 263 )) 264 for i, fm in enumerate(self._frames): 265 f.write(pack_index_entry( 266 data_offset=offsets[i], 267 patch_count=fm["patch_count"], 268 ref_frame_id=fm["ref_frame_id"], 269 metadata_len=fm["metadata_len"], 270 frame_type=fm["frame_type"], 271 )) 272 for fd in self._frame_data: 273 f.write(fd) 274 275 self._closed = True
Finalise and write the file. Called automatically by __exit__.
288 @property 289 def frame_count(self) -> int: 290 """Total frames including hidden baselines.""" 291 return len(self._frames)
Total frames including hidden baselines.
293 @property 294 def content_frame_count(self) -> int: 295 """Content frames only (what callers see).""" 296 return len(self._frames) - self._baseline_count
Content frames only (what callers see).
302 @property 303 def stats(self) -> list[dict]: 304 """Per-frame stats including hidden baseline frames.""" 305 result = [] 306 for i, (fm, fd) in enumerate(zip(self._frames, self._frame_data)): 307 is_baseline = i in self._baseline_ids 308 result.append({ 309 "frame_id": i, 310 "content_id": None if is_baseline else i - self._baseline_count, 311 "frame_type": "keyframe" if fm["frame_type"] == FRAME_KEYFRAME else "delta", 312 "ref_frame_id": fm["ref_frame_id"], 313 "patch_count": fm["patch_count"], 314 "data_bytes": len(fd), 315 "is_baseline": is_baseline, 316 }) 317 return result
Per-frame stats including hidden baseline frames.
49class ImpakReader: 50 """ 51 Random-access reader for .impak collections. 52 53 The file index is read at open time (small); pixel data is lazy-loaded 54 per frame request. 55 56 In manual-mode files the leading baseline keyframes are automatically 57 detected (via their {"_baseline": true} metadata) and excluded from all 58 public-facing operations. They are still used internally when decoding 59 delta frames that reference them. 60 """ 61 62 def __init__( 63 self, 64 path: Union[str, Path], 65 low_ram_mode: bool = False, 66 cache_size: Optional[int] = None, 67 ): 68 self.path = Path(path) 69 self._fh = open(self.path, "rb") 70 self._header: dict = {} 71 self._index: list[dict] = [] 72 self._name_map: dict[str, int] = {} # name → content_id 73 self._meta_cache: dict[int, dict] = {} 74 75 self.low_ram_mode = low_ram_mode 76 if cache_size is None: 77 cache_size = 2 if low_ram_mode else 0 78 self.cache_size = max(0, cache_size) 79 80 # abs_id -> Image.Image 81 # normal mode: unlimited dict-like cache 82 # low_ram_mode: bounded LRU cache 83 self._decode_cache: "OrderedDict[int, Image.Image]" = OrderedDict() 84 85 self._read_header() 86 self._read_index() 87 88 self._baseline_count: int = self._detect_baseline_count() 89 90 def __enter__(self): 91 return self 92 93 def __exit__(self, *_): 94 self.close() 95 96 def close(self): 97 if self._fh and not self._fh.closed: 98 self._fh.close() 99 100 def __len__(self) -> int: 101 """Number of content frames (baseline frames are not counted).""" 102 return self._header["frame_count"] - self._baseline_count 103 104 def __getitem__(self, key: Union[int, str]) -> Image.Image: 105 """ 106 Retrieve a content frame by index or name. 107 108 Integer indices are relative to content frames only (0 = first content 109 frame, regardless of how many baselines precede it). 110 """ 111 if isinstance(key, str): 112 self._build_name_map() 113 if key not in self._name_map: 114 raise KeyError(f"Frame name '{key}' not found in collection") 115 abs_id = self._name_map[key] 116 return self._decode_frame(abs_id) 117 118 if key < 0: 119 key = len(self) + key 120 if not (0 <= key < len(self)): 121 raise IndexError(f"Frame index {key} out of range (0..{len(self)-1})") 122 return self._decode_frame(self._content_to_abs(key)) 123 124 def __iter__(self) -> Iterator[Image.Image]: 125 for i in range(len(self)): 126 yield self._decode_frame(self._content_to_abs(i)) 127 128 def _cache_get(self, abs_id: int) -> Optional[Image.Image]: 129 img = self._decode_cache.get(abs_id) 130 if img is None: 131 return None 132 # LRU touch 133 self._decode_cache.move_to_end(abs_id) 134 return img.copy() 135 136 def _cache_put(self, abs_id: int, img: Image.Image) -> None: 137 if self.cache_size == 0: 138 return 139 140 self._decode_cache[abs_id] = img.copy() 141 self._decode_cache.move_to_end(abs_id) 142 143 if self.low_ram_mode: 144 while len(self._decode_cache) > self.cache_size: 145 self._decode_cache.popitem(last=False) 146 147 def get_metadata(self, frame_id: int) -> dict: 148 """ 149 Return the metadata dict for a content frame by content index. 150 """ 151 return self._read_metadata(self._content_to_abs(frame_id)) 152 153 def info(self) -> str: 154 """Human-readable summary of the collection.""" 155 h = self._header 156 mode_name = MODE_NAMES.get(h["diff_mode"], "unknown") 157 codec_name = CODEC_NAMES.get(h.get("codec", 0), "png") 158 quality = h.get("quality", 100) 159 quality_str = "lossless" if quality == 100 else f"quality={quality}" 160 total_bytes = self.path.stat().st_size 161 n_content = len(self) 162 lines = [ 163 f"File : {self.path}", 164 f"Format : impak v{h['version']}", 165 f"Diff : {mode_name}", 166 f"Codec : {codec_name} ({quality_str})", 167 f"Canvas : {h['width']} × {h['height']} px", 168 f"Frames : {n_content}" 169 + (f" ({self._baseline_count} hidden baseline(s))" if self._baseline_count else ""), 170 f"Size : {total_bytes:,} bytes ({total_bytes/1024:.1f} KB)", 171 "", 172 ] 173 174 frame_sizes = {self._frame_size(self._content_to_abs(i)) for i in range(n_content)} 175 mixed_sizes = len(frame_sizes) > 1 176 177 if mixed_sizes: 178 lines += [ 179 f"{'ID':>4} {'Type':>9} {'Ref':>4} {'Patches':>7} {'Bytes':>9} {'Size':>11} Name", 180 "─" * 72, 181 ] 182 else: 183 lines += [ 184 f"{'ID':>4} {'Type':>9} {'Ref':>4} {'Patches':>7} {'Bytes':>9} Name", 185 "─" * 64, 186 ] 187 188 for content_id in range(n_content): 189 abs_id = self._content_to_abs(content_id) 190 entry = self._index[abs_id] 191 ftype = "keyframe" if entry["frame_type"] == FRAME_KEYFRAME else "delta" 192 ref_abs = entry["ref_frame_id"] 193 if entry["frame_type"] == FRAME_DELTA: 194 if ref_abs < self._baseline_count: 195 ref = f"B{ref_abs}" 196 else: 197 ref = str(ref_abs - self._baseline_count) 198 else: 199 ref = "self" 200 data_bytes = self._frame_data_size(abs_id) 201 meta = self._read_metadata(abs_id) 202 name = meta.get("name", "") 203 if mixed_sizes: 204 fw, fh = self._frame_size(abs_id) 205 size_str = f"{fw}×{fh}" 206 lines.append( 207 f"{content_id:>4} {ftype:>9} {ref:>4} " 208 f"{entry['patch_count']:>7} {data_bytes:>9,} {size_str:>11} {name}" 209 ) 210 else: 211 lines.append( 212 f"{content_id:>4} {ftype:>9} {ref:>4} " 213 f"{entry['patch_count']:>7} {data_bytes:>9,} {name}" 214 ) 215 return "\n".join(lines) 216 217 def diff_map(self, frame_id: int) -> list[tuple[int, int, int, int]]: 218 """ 219 Return the (x, y, w, h) patch rectangles for a content frame. 220 *frame_id* is a content-visible index. 221 """ 222 abs_id = self._content_to_abs(frame_id) 223 entry = self._index[abs_id] 224 if entry["frame_type"] == FRAME_KEYFRAME: 225 w, h = self._frame_size(abs_id) 226 return [(0, 0, w, h)] 227 patches = self._read_patches(abs_id) 228 return [(x, y, w, h) for (x, y, w, h, _) in patches] 229 230 @property 231 def canvas_size(self) -> tuple[int, int]: 232 return self._header["width"], self._header["height"] 233 234 @property 235 def mode(self) -> str: 236 return MODE_NAMES.get(self._header["diff_mode"], "unknown") 237 238 @property 239 def baseline_count(self) -> int: 240 """Number of hidden baseline frames (0 for non-manual files).""" 241 return self._baseline_count 242 243 def _frame_size(self, abs_id: int) -> tuple[int, int]: 244 """ 245 Return (width, height) for a frame. 246 247 For files written with multi-size support the per-frame size is stored 248 in JSON metadata under "_size". For older single-size files (or frames 249 that pre-date the feature) we fall back to the global canvas dimensions. 250 """ 251 meta = self._read_metadata(abs_id) 252 if "_size" in meta: 253 return tuple(meta["_size"]) 254 return self._header["width"], self._header["height"] 255 256 def _content_to_abs(self, content_id: int) -> int: 257 """Convert a public content index to an absolute frame index.""" 258 return content_id + self._baseline_count 259 260 def _decode_frame(self, abs_id: int, _memo: Optional[dict[int, Image.Image]] = None) -> Image.Image: 261 """Decode frame by absolute index (works for baselines too).""" 262 if _memo is None: 263 _memo = {} 264 265 if abs_id in _memo: 266 return _memo[abs_id].copy() 267 268 cached = self._cache_get(abs_id) 269 if cached is not None: 270 _memo[abs_id] = cached 271 return cached.copy() 272 273 entry = self._index[abs_id] 274 codec = self.codec 275 276 if entry["frame_type"] == FRAME_KEYFRAME: 277 patches = self._read_patches(abs_id) 278 assert len(patches) == 1 279 img = self._decompress_patch_to_image(patches[0][4], codec) 280 else: 281 ref_id = entry["ref_frame_id"] 282 ref_img = self._decode_frame(ref_id, _memo=_memo) 283 patches = self._read_patches(abs_id) 284 img = reconstruct(ref_img, patches, codec=codec) 285 286 img = img.convert("RGBA") 287 _memo[abs_id] = img 288 self._cache_put(abs_id, img) 289 return img.copy() 290 291 def _read_header(self): 292 self._fh.seek(0) 293 raw = self._fh.read(FILE_HEADER_SIZE) 294 if len(raw) < FILE_HEADER_SIZE: 295 raise ValueError("File too small to be a valid .impak archive") 296 h = unpack_file_header(raw) 297 if h["magic"] != MAGIC: 298 raise ValueError("Not a valid .impak file (bad magic bytes)") 299 self._header = h 300 301 def _read_index(self): 302 self._fh.seek(self._header["index_offset"]) 303 count = self._header["frame_count"] 304 self._index = [] 305 for _ in range(count): 306 raw = self._fh.read(FRAME_INDEX_ENTRY_SIZE) 307 self._index.append(unpack_index_entry(raw)) 308 309 def _read_patches(self, abs_id: int) -> list: 310 entry = self._index[abs_id] 311 self._fh.seek(entry["data_offset"]) 312 patches = [] 313 for _ in range(entry["patch_count"]): 314 hdr = self._fh.read(PATCH_HEADER_SIZE) 315 x, y, w, h, data_len = unpack_patch_header(hdr) 316 data = self._fh.read(data_len) 317 patches.append((x, y, w, h, data)) 318 return patches 319 320 def _read_metadata(self, abs_id: int) -> dict: 321 if abs_id in self._meta_cache: 322 return self._meta_cache[abs_id] 323 entry = self._index[abs_id] 324 meta_len = entry["metadata_len"] 325 if meta_len == 0: 326 self._meta_cache[abs_id] = {} 327 return {} 328 self._fh.seek(entry["data_offset"]) 329 for _ in range(entry["patch_count"]): 330 hdr = self._fh.read(PATCH_HEADER_SIZE) 331 _, _, _, _, data_len = unpack_patch_header(hdr) 332 self._fh.seek(data_len, 1) 333 raw = self._fh.read(meta_len) 334 try: 335 meta = json.loads(raw.decode()) 336 except Exception: 337 meta = {} 338 self._meta_cache[abs_id] = meta 339 return meta 340 341 def _frame_data_size(self, abs_id: int) -> int: 342 entry = self._index[abs_id] 343 total = self._header["frame_count"] 344 if abs_id + 1 < total: 345 next_off = self._index[abs_id + 1]["data_offset"] 346 else: 347 next_off = self.path.stat().st_size 348 return next_off - entry["data_offset"] 349 350 def _decompress_patch_to_image(self, compressed: bytes, codec: str = "png") -> Image.Image: 351 from .differ import _decode_patch 352 return _decode_patch(compressed, codec=codec) 353 354 def _detect_baseline_count(self) -> int: 355 """ 356 Scan leading frames for the {"_baseline": true} metadata tag. 357 Returns the count of consecutive baseline frames at the front. 358 This is how the reader discovers how many frames to hide, without 359 any binary format change. 360 """ 361 count = 0 362 total = self._header["frame_count"] 363 for abs_id in range(total): 364 meta = self._read_metadata(abs_id) 365 if meta.get("_baseline"): 366 count += 1 367 else: 368 break # baselines are always contiguous at the front 369 return count 370 371 @property 372 def codec(self) -> str: 373 return CODEC_NAMES.get(self._header.get("codec", 0), "png") 374 375 @property 376 def quality(self) -> int: 377 return self._header.get("quality", 100) 378 379 def _build_name_map(self): 380 if self._name_map: 381 return 382 for content_id in range(len(self)): 383 abs_id = self._content_to_abs(content_id) 384 meta = self._read_metadata(abs_id) 385 if "name" in meta: 386 self._name_map[meta["name"]] = abs_id 387 388 @classmethod 389 def load_all( 390 cls, 391 path: Union[str, Path], 392 low_ram_mode: bool = False, 393 cache_size: Optional[int] = None, 394 ) -> List[Image.Image]: 395 """Load every content frame and return as a list of PIL Images.""" 396 with cls(path, low_ram_mode=low_ram_mode, cache_size=cache_size) as r: 397 return list(r) 398 399 @classmethod 400 def load_frame( 401 cls, 402 path: Union[str, Path], 403 frame_id: int, 404 low_ram_mode: bool = False, 405 cache_size: Optional[int] = None, 406 ) -> Image.Image: 407 """Load a single content frame by content index.""" 408 with cls(path, low_ram_mode=low_ram_mode, cache_size=cache_size) as r: 409 return r[frame_id]
Random-access reader for .impak collections.
The file index is read at open time (small); pixel data is lazy-loaded per frame request.
In manual-mode files the leading baseline keyframes are automatically detected (via their {"_baseline": true} metadata) and excluded from all public-facing operations. They are still used internally when decoding delta frames that reference them.
62 def __init__( 63 self, 64 path: Union[str, Path], 65 low_ram_mode: bool = False, 66 cache_size: Optional[int] = None, 67 ): 68 self.path = Path(path) 69 self._fh = open(self.path, "rb") 70 self._header: dict = {} 71 self._index: list[dict] = [] 72 self._name_map: dict[str, int] = {} # name → content_id 73 self._meta_cache: dict[int, dict] = {} 74 75 self.low_ram_mode = low_ram_mode 76 if cache_size is None: 77 cache_size = 2 if low_ram_mode else 0 78 self.cache_size = max(0, cache_size) 79 80 # abs_id -> Image.Image 81 # normal mode: unlimited dict-like cache 82 # low_ram_mode: bounded LRU cache 83 self._decode_cache: "OrderedDict[int, Image.Image]" = OrderedDict() 84 85 self._read_header() 86 self._read_index() 87 88 self._baseline_count: int = self._detect_baseline_count()
147 def get_metadata(self, frame_id: int) -> dict: 148 """ 149 Return the metadata dict for a content frame by content index. 150 """ 151 return self._read_metadata(self._content_to_abs(frame_id))
Return the metadata dict for a content frame by content index.
153 def info(self) -> str: 154 """Human-readable summary of the collection.""" 155 h = self._header 156 mode_name = MODE_NAMES.get(h["diff_mode"], "unknown") 157 codec_name = CODEC_NAMES.get(h.get("codec", 0), "png") 158 quality = h.get("quality", 100) 159 quality_str = "lossless" if quality == 100 else f"quality={quality}" 160 total_bytes = self.path.stat().st_size 161 n_content = len(self) 162 lines = [ 163 f"File : {self.path}", 164 f"Format : impak v{h['version']}", 165 f"Diff : {mode_name}", 166 f"Codec : {codec_name} ({quality_str})", 167 f"Canvas : {h['width']} × {h['height']} px", 168 f"Frames : {n_content}" 169 + (f" ({self._baseline_count} hidden baseline(s))" if self._baseline_count else ""), 170 f"Size : {total_bytes:,} bytes ({total_bytes/1024:.1f} KB)", 171 "", 172 ] 173 174 frame_sizes = {self._frame_size(self._content_to_abs(i)) for i in range(n_content)} 175 mixed_sizes = len(frame_sizes) > 1 176 177 if mixed_sizes: 178 lines += [ 179 f"{'ID':>4} {'Type':>9} {'Ref':>4} {'Patches':>7} {'Bytes':>9} {'Size':>11} Name", 180 "─" * 72, 181 ] 182 else: 183 lines += [ 184 f"{'ID':>4} {'Type':>9} {'Ref':>4} {'Patches':>7} {'Bytes':>9} Name", 185 "─" * 64, 186 ] 187 188 for content_id in range(n_content): 189 abs_id = self._content_to_abs(content_id) 190 entry = self._index[abs_id] 191 ftype = "keyframe" if entry["frame_type"] == FRAME_KEYFRAME else "delta" 192 ref_abs = entry["ref_frame_id"] 193 if entry["frame_type"] == FRAME_DELTA: 194 if ref_abs < self._baseline_count: 195 ref = f"B{ref_abs}" 196 else: 197 ref = str(ref_abs - self._baseline_count) 198 else: 199 ref = "self" 200 data_bytes = self._frame_data_size(abs_id) 201 meta = self._read_metadata(abs_id) 202 name = meta.get("name", "") 203 if mixed_sizes: 204 fw, fh = self._frame_size(abs_id) 205 size_str = f"{fw}×{fh}" 206 lines.append( 207 f"{content_id:>4} {ftype:>9} {ref:>4} " 208 f"{entry['patch_count']:>7} {data_bytes:>9,} {size_str:>11} {name}" 209 ) 210 else: 211 lines.append( 212 f"{content_id:>4} {ftype:>9} {ref:>4} " 213 f"{entry['patch_count']:>7} {data_bytes:>9,} {name}" 214 ) 215 return "\n".join(lines)
Human-readable summary of the collection.
217 def diff_map(self, frame_id: int) -> list[tuple[int, int, int, int]]: 218 """ 219 Return the (x, y, w, h) patch rectangles for a content frame. 220 *frame_id* is a content-visible index. 221 """ 222 abs_id = self._content_to_abs(frame_id) 223 entry = self._index[abs_id] 224 if entry["frame_type"] == FRAME_KEYFRAME: 225 w, h = self._frame_size(abs_id) 226 return [(0, 0, w, h)] 227 patches = self._read_patches(abs_id) 228 return [(x, y, w, h) for (x, y, w, h, _) in patches]
Return the (x, y, w, h) patch rectangles for a content frame. frame_id is a content-visible index.
238 @property 239 def baseline_count(self) -> int: 240 """Number of hidden baseline frames (0 for non-manual files).""" 241 return self._baseline_count
Number of hidden baseline frames (0 for non-manual files).
388 @classmethod 389 def load_all( 390 cls, 391 path: Union[str, Path], 392 low_ram_mode: bool = False, 393 cache_size: Optional[int] = None, 394 ) -> List[Image.Image]: 395 """Load every content frame and return as a list of PIL Images.""" 396 with cls(path, low_ram_mode=low_ram_mode, cache_size=cache_size) as r: 397 return list(r)
Load every content frame and return as a list of PIL Images.
399 @classmethod 400 def load_frame( 401 cls, 402 path: Union[str, Path], 403 frame_id: int, 404 low_ram_mode: bool = False, 405 cache_size: Optional[int] = None, 406 ) -> Image.Image: 407 """Load a single content frame by content index.""" 408 with cls(path, low_ram_mode=low_ram_mode, cache_size=cache_size) as r: 409 return r[frame_id]
Load a single content frame by content index.
75def compute_patches( 76 ref_img: Image.Image, 77 new_img: Image.Image, 78 threshold: int = 4, 79 tile_size: int = 32, 80 merge_gap: int = 8, 81 codec: str = "png", 82 quality: int = 100, 83 workers: Optional[int] = None, 84 ref_arr: Optional[np.ndarray] = None, 85 new_arr: Optional[np.ndarray] = None, 86) -> List[Patch]: 87 """ 88 Compare *new_img* against *ref_img* using a tile grid. 89 Parameters 90 ---------- 91 threshold : per-channel absolute pixel delta that counts as "changed" 92 (use 0 for perfectly lossless; 3-6 absorbs JPEG noise) 93 tile_size : grid cell size in pixels (smaller = finer granularity but 94 more patch overhead; 16–64 is a good range) 95 merge_gap : changed tiles within this many pixels of each other on the 96 same row are merged into one patch (reduces patch count) 97 workers : thread-pool size for parallel patch compression. 98 None = use ThreadPoolExecutor default (cpu_count × 4 or so). 99 Set to 1 to disable parallelism entirely. 100 101 """ 102 if ref_arr is None: 103 ref = np.array(ref_img.convert("RGBA"), dtype=np.int16) 104 else: 105 ref = ref_arr 106 if ref.dtype != np.int16: 107 ref = ref.astype(np.int16, copy=False) 108 109 if new_arr is None: 110 new = np.array(new_img.convert("RGBA"), dtype=np.int16) 111 else: 112 new = new_arr 113 if new.dtype != np.int16: 114 new = new.astype(np.int16, copy=False) 115 116 if ref.shape != new.shape: 117 raise ValueError( 118 f"Image dimensions differ: ref={ref.shape[:2]} new={new.shape[:2]}" 119 ) 120 121 h, w = ref.shape[:2] 122 diff = np.abs(new - ref).max(axis=2) 123 124 cols = (w + tile_size - 1) // tile_size 125 rows = (h + tile_size - 1) // tile_size 126 127 pad_h = rows * tile_size - h 128 pad_w = cols * tile_size - w 129 if pad_h or pad_w: 130 diff_padded = np.pad(diff, ((0, pad_h), (0, pad_w)), constant_values=0) 131 else: 132 diff_padded = diff 133 134 tile_max = ( 135 diff_padded 136 .reshape(rows, tile_size, cols, tile_size) 137 .max(axis=(1, 3)) 138 ) 139 changed_mask = tile_max > threshold 140 tile_rows, tile_cols = np.where(changed_mask) 141 changed_tiles = list(zip(tile_rows.tolist(), tile_cols.tolist())) 142 143 if not changed_tiles: 144 return [] 145 146 rects = _tiles_to_rects(changed_tiles, tile_size, w, h) 147 merged = _merge_rects(rects, merge_gap, tile_size, w, h) 148 149 if len(merged) <= 2 or workers == 1: 150 patches: List[Patch] = [ 151 (rx, ry, rw, rh, _encode_crop(new_img, rx, ry, rw, rh, codec=codec, quality=quality)) 152 for (rx, ry, rw, rh) in merged 153 ] 154 else: 155 def _compress_rect(rect: tuple) -> Patch: 156 rx, ry, rw, rh = rect 157 return rx, ry, rw, rh, _encode_crop(new_img, rx, ry, rw, rh, codec=codec, quality=quality) 158 159 with ThreadPoolExecutor(max_workers=workers) as pool: 160 patches = list(pool.map(_compress_rect, merged)) 161 162 return patches
Compare new_img against ref_img using a tile grid.
Parameters
- threshold (per-channel absolute pixel delta that counts as "changed"): (use 0 for perfectly lossless; 3-6 absorbs JPEG noise)
- tile_size (grid cell size in pixels (smaller = finer granularity but): more patch overhead; 16–64 is a good range)
- merge_gap (changed tiles within this many pixels of each other on the): same row are merged into one patch (reduces patch count)
- workers (thread-pool size for parallel patch compression.): None = use ThreadPoolExecutor default (cpu_count × 4 or so). Set to 1 to disable parallelism entirely.
235def reconstruct(base_img: Image.Image, patches: List[Patch], codec: str = "png") -> Image.Image: 236 """ 237 Apply a list of patches onto *base_img* and return a new Image. 238 239 *base_img* is not modified in place. 240 *codec* must match the codec used when the patches were encoded. 241 """ 242 result = base_img.copy().convert("RGBA") 243 for (x, y, w, h, compressed) in patches: 244 patch_img = _decode_patch(compressed, codec=codec).convert("RGBA") 245 result.paste(patch_img, (x, y)) 246 return result
Apply a list of patches onto base_img and return a new Image.
base_img is not modified in place. codec must match the codec used when the patches were encoded.
258def similarity_score(img_a: Image.Image, img_b: Image.Image) -> float: 259 """ 260 Return fraction of pixels that are identical (0.0 = completely different, 261 1.0 = identical). Useful for deciding whether to force a keyframe. 262 """ 263 a = np.array(img_a.convert("RGBA"), dtype=np.int16) 264 b = np.array(img_b.convert("RGBA"), dtype=np.int16) 265 if a.shape != b.shape: 266 return 0.0 267 diff = np.abs(a - b).max(axis=2) 268 identical_pixels = int((diff == 0).sum()) 269 total = a.shape[0] * a.shape[1] 270 return identical_pixels / total
Return fraction of pixels that are identical (0.0 = completely different, 1.0 = identical). Useful for deciding whether to force a keyframe.