impak

impak – space-efficient image collection format based on pixel-diff patches.

Quick start

Pack a folder of PNG variants into a single file:

import impak

with impak.create("variants.impak", mode="vs_first") as w:
    for path in sorted(Path("frames/").glob("*.png")):
        w.add(path, name=path.stem)

Read them back:

with impak.open("variants.impak") as r:
    print(r.info())          # human-readable summary
    img = r[0]               # PIL Image, by index
    img = r["frame_01"]      # by name
    for img in r:            # iterate all
        img.show()
Diff modes
Mode Description
lto (default) Each frame searches all prior frames and picks the best reference (smallest patch payload), forming a DAG of deltas. A keyframe is stored when it's smaller than the best delta. Params: lto_candidates (default 6), max_ref_depth (default 8).
vs_first Every frame is a patch against frame 0.
vs_prior Every frame is a patch against the immediately prior frame.
keyframe Like vs_prior but stores a full image every K frames. Param: keyframe_interval (default 10).
manual User-designated baselines are stored first and searched before each content frame. Params: baselines (required), fallback_mode (default vs_prior).
Example:

    with impak.create(
        "out.impak",
        mode="manual",
        baselines=["day_bg.png", "night_bg.png"],
        fallback_mode="vs_prior",
    ) as w:
        for path in content_frames:
            w.add(path)
 1"""
 2impak – space-efficient image collection format based on pixel-diff patches.
 3
 4Quick start
 5-----------
 6Pack a folder of PNG variants into a single file:
 7
 8    import impak
 9
10    with impak.create("variants.impak", mode="vs_first") as w:
11        for path in sorted(Path("frames/").glob("*.png")):
12            w.add(path, name=path.stem)
13
14Read them back:
15
16    with impak.open("variants.impak") as r:
17        print(r.info())          # human-readable summary
18        img = r[0]               # PIL Image, by index
19        img = r["frame_01"]      # by name
20        for img in r:            # iterate all
21            img.show()
22
23Diff modes
24----------
25| Mode | Description |
26|------|-------------|
27| ``lto`` *(default)* | Each frame searches all prior frames and picks the best reference (smallest patch payload), forming a DAG of deltas. A keyframe is stored when it's smaller than the best delta. Params: ``lto_candidates`` (default 6), ``max_ref_depth`` (default 8). |
28| ``vs_first`` | Every frame is a patch against frame 0. |
29| ``vs_prior`` | Every frame is a patch against the immediately prior frame. |
30| ``keyframe`` | Like ``vs_prior`` but stores a full image every K frames. Param: ``keyframe_interval`` (default 10). |
31| ``manual`` | User-designated baselines are stored first and searched before each content frame. Params: ``baselines`` (required), ``fallback_mode`` (default ``vs_prior``). |
32
33    Example:
34
35        with impak.create(
36            "out.impak",
37            mode="manual",
38            baselines=["day_bg.png", "night_bg.png"],
39            fallback_mode="vs_prior",
40        ) as w:
41            for path in content_frames:
42                w.add(path)
43
44"""
45
46from .encoder import ImpakWriter
47from .decoder import ImpakReader
48from .differ import compute_patches, reconstruct, similarity_score
49
50
51def create(path, mode="lto", **kwargs) -> ImpakWriter:
52    """
53    Parameters
54    ----------
55    path              : file path (str or Path)
56    mode              : "vs_first" | "vs_prior" | "keyframe" | "lto" | "manual"
57    codec             : "png" or "webp" (default)
58    quality           : 0-100. 100 = lossless for both codecs.
59                        Values <100 enable lossy WebP (smaller, not pixel-perfect).
60    keyframe_interval : (keyframe mode) full image every N frames (default 10)
61    threshold         : pixel delta threshold 0-255 (default 4)
62    tile_size         : diff grid tile size in px (default 32)
63    merge_gap         : merge adjacent changed tiles within N px (default 8)
64    auto_keyframe_sim : force keyframe when similarity drops below this (default 0.5)
65    lto_candidates    : (lto / manual mode) how many top-similarity frames to
66                        fully probe (default 6)
67    max_ref_depth     : (lto mode) max decode-chain depth before forcing a keyframe
68                        (default 8)
69    baselines         : (manual mode, required) list of PIL Images or file paths
70                        stored as pinned keyframes at the start of the file;
71                        every content frame is diffed against these first.
72    fallback_mode     : (manual mode) strategy used when no baseline wins;
73                        one of "vs_first", "vs_prior", "keyframe", "lto"
74                        (default "vs_prior")
75    """
76    return ImpakWriter(path, mode=mode, **kwargs)
77
78
79def open(path, low_ram_mode=False, cache_size=None, **kwargs) -> ImpakReader:
80    return ImpakReader(
81        path,
82        low_ram_mode=low_ram_mode,
83        cache_size=cache_size,
84        **kwargs,
85    )
86
87
88__all__ = [
89    "create",
90    "open",
91    "ImpakWriter",
92    "ImpakReader",
93    "compute_patches",
94    "reconstruct",
95    "similarity_score",
96]
97
98__version__ = "0.1.3"
def create(path, mode='lto', **kwargs) -> ImpakWriter:
52def create(path, mode="lto", **kwargs) -> ImpakWriter:
53    """
54    Parameters
55    ----------
56    path              : file path (str or Path)
57    mode              : "vs_first" | "vs_prior" | "keyframe" | "lto" | "manual"
58    codec             : "png" or "webp" (default)
59    quality           : 0-100. 100 = lossless for both codecs.
60                        Values <100 enable lossy WebP (smaller, not pixel-perfect).
61    keyframe_interval : (keyframe mode) full image every N frames (default 10)
62    threshold         : pixel delta threshold 0-255 (default 4)
63    tile_size         : diff grid tile size in px (default 32)
64    merge_gap         : merge adjacent changed tiles within N px (default 8)
65    auto_keyframe_sim : force keyframe when similarity drops below this (default 0.5)
66    lto_candidates    : (lto / manual mode) how many top-similarity frames to
67                        fully probe (default 6)
68    max_ref_depth     : (lto mode) max decode-chain depth before forcing a keyframe
69                        (default 8)
70    baselines         : (manual mode, required) list of PIL Images or file paths
71                        stored as pinned keyframes at the start of the file;
72                        every content frame is diffed against these first.
73    fallback_mode     : (manual mode) strategy used when no baseline wins;
74                        one of "vs_first", "vs_prior", "keyframe", "lto"
75                        (default "vs_prior")
76    """
77    return ImpakWriter(path, mode=mode, **kwargs)
Parameters
  • path (file path (str or Path)):

  • mode ("vs_first" | "vs_prior" | "keyframe" | "lto" | "manual"):

  • codec ("png" or "webp" (default)):

  • quality (0-100. 100 = lossless for both codecs.): Values <100 enable lossy WebP (smaller, not pixel-perfect).

  • keyframe_interval ((keyframe mode) full image every N frames (default 10)):

  • threshold (pixel delta threshold 0-255 (default 4)):

  • tile_size (diff grid tile size in px (default 32)):

  • merge_gap (merge adjacent changed tiles within N px (default 8)):

  • auto_keyframe_sim (force keyframe when similarity drops below this (default 0.5)):

  • lto_candidates ((lto / manual mode) how many top-similarity frames to): fully probe (default 6)

  • max_ref_depth ((lto mode) max decode-chain depth before forcing a keyframe): (default 8)
  • baselines ((manual mode, required) list of PIL Images or file paths): stored as pinned keyframes at the start of the file; every content frame is diffed against these first.
  • fallback_mode ((manual mode) strategy used when no baseline wins;): one of "vs_first", "vs_prior", "keyframe", "lto" (default "vs_prior")
def open( path, low_ram_mode=False, cache_size=None, **kwargs) -> ImpakReader:
80def open(path, low_ram_mode=False, cache_size=None, **kwargs) -> ImpakReader:
81    return ImpakReader(
82        path,
83        low_ram_mode=low_ram_mode,
84        cache_size=cache_size,
85        **kwargs,
86    )
class ImpakWriter:
 63class ImpakWriter:
 64    """
 65    Sequential writer for .impak collections.
 66
 67    Parameters
 68    ----------
 69    path              : output file path
 70    mode              : "vs_first" | "vs_prior" | "keyframe" | "lto" | "manual"
 71    keyframe_interval : (keyframe mode only) store a full image every N frames
 72    threshold         : pixel delta considered "changed" (0 = perfectly lossless)
 73    tile_size         : diff grid cell size in pixels
 74    merge_gap         : pixels gap between changed tiles before they are merged
 75    auto_keyframe_sim : if similarity drops below this fraction a keyframe is
 76                        forced regardless of mode (0.0 = never force)
 77    workers           : thread-pool size used for parallel patch compression and
 78                        LTO candidate probing.  None = os.cpu_count().
 79                        Set to 1 to disable parallelism entirely.
 80    baselines         : (manual mode) list of PIL Images or file paths used as
 81                        pinned reference anchors.  They are stored as hidden
 82                        leading keyframes so the delta chain is self-contained,
 83                        but ImpakReader hides them from callers — only content
 84                        frames are visible when iterating or indexing.
 85                        Baselines are automatically resized to match the canvas
 86                        size established by the first content frame (or the
 87                        first baseline if all are the same size).
 88    fallback_mode     : (manual mode) diff strategy applied when no baseline
 89                        yields a patch set smaller than this alternative.
 90                        Accepts "vs_first", "vs_prior", "keyframe", or "lto".
 91                        Defaults to "lto".
 92    """
 93
 94    def __init__(
 95        self,
 96        path: Union[str, Path],
 97        mode: str = "vs_first",
 98        keyframe_interval: int = 10,
 99        threshold: int = 4,
100        tile_size: int = 64,
101        merge_gap: int = 8,
102        auto_keyframe_sim: float = 0.5,
103        codec: str = "webp",
104        quality: int = 100,
105        lto_candidates: int = 6,
106        max_ref_depth: int = 8,
107        workers: Optional[int] = None,
108        baselines: Optional[List[Union[Image.Image, str, Path]]] = None,
109        fallback_mode: str = "lto",
110    ):
111        if mode not in MODE_FROM_NAME:
112            raise ValueError(f"mode must be one of {list(MODE_FROM_NAME)}")
113        if codec not in ("png", "webp"):
114            raise ValueError("codec must be 'png' or 'webp'")
115        if not (0 <= quality <= 100):
116            raise ValueError("quality must be 0-100")
117        if lto_candidates < 1:
118            raise ValueError("lto_candidates must be >= 1")
119        if max_ref_depth < 1:
120            raise ValueError("max_ref_depth must be >= 1")
121
122        if mode == "manual":
123            _valid_fallbacks = [m for m in MODE_FROM_NAME if m != "manual"]
124            if fallback_mode not in _valid_fallbacks:
125                raise ValueError(
126                    f"fallback_mode must be one of {_valid_fallbacks}"
127                )
128            if not baselines:
129                raise ValueError(
130                    "manual mode requires at least one entry in 'baselines'"
131                )
132
133        self.path = Path(path)
134        self.mode = mode
135        self.mode_id = MODE_FROM_NAME[mode]
136        self.keyframe_interval = keyframe_interval
137        self.threshold = threshold
138        self.tile_size = tile_size
139        self.merge_gap = merge_gap
140        self.auto_keyframe_sim = auto_keyframe_sim
141        self.codec = codec
142        self.quality = quality
143        self.lto_candidates = lto_candidates
144        self.max_ref_depth = max_ref_depth
145        self.workers = workers
146        self.fallback_mode = fallback_mode
147        self.fallback_mode_id = MODE_FROM_NAME.get(fallback_mode, MODE_LTO)
148
149        self._frames: list[dict] = []
150        self._frame_data: list[bytes] = []
151        self._ref_images: list[Image.Image] = []
152        self._ref_arrays: list[np.ndarray] = []
153        self._depths: list[int] = []
154
155
156        self._canvas: Optional[tuple[int, int]] = None
157        self._size_groups: dict[tuple[int, int], list[int]] = {}
158        self._closed = False
159
160        self._baseline_ids: list[int] = []
161        self._baseline_count: int = 0
162        self._pending_baselines: list = list(baselines) if baselines else []
163
164        _pool_size = workers if workers is not None else (os.cpu_count() or 4)
165        self._pool = ThreadPoolExecutor(max_workers=max(1, _pool_size))
166
167    def add(
168        self,
169        image: Union[Image.Image, str, Path],
170        name: Optional[str] = None,
171        metadata: Optional[dict] = None,
172    ) -> int:
173        """
174        Add one content image to the collection.  Returns the 0-based frame
175        index as seen by readers (i.e. not counting hidden baseline frames).
176
177        *image* may be a Pillow Image or a file path.
178        *name* is stored in per-frame metadata (useful for later retrieval).
179        *metadata* is an arbitrary JSON-serialisable dict merged with name.
180        """
181        if self._closed:
182            raise RuntimeError("Writer is already closed")
183
184        if isinstance(image, (str, Path)):
185            image = Image.open(image)
186
187        image = image.convert("RGBA")
188        new_arr = np.array(image, dtype=np.int16)
189
190        if self._canvas is None:
191            self._canvas = (image.width, image.height)
192
193        if self._pending_baselines:
194            self._inject_baselines(self._pending_baselines)
195            self._pending_baselines = []
196
197        frame_id = len(self._frames)
198
199        frame_type, ref_id, patches = self._encode_frame(image, frame_id, new_arr)
200
201        meta: dict = {}
202        if name:
203            meta["name"] = name
204        if metadata:
205            meta.update(metadata)
206
207        meta["_size"] = [image.width, image.height]
208        meta_bytes = json.dumps(meta, separators=(",", ":")).encode() if meta else b""
209
210        parts: list[bytes] = []
211        for (x, y, w, h, data) in patches:
212            parts.append(pack_patch_header(x, y, w, h, len(data)))
213            parts.append(data)
214        parts.append(meta_bytes)
215        frame_bytes = b"".join(parts)
216
217        if frame_type == FRAME_KEYFRAME:
218            self._depths.append(0)
219        else:
220            self._depths.append(self._depths[ref_id] + 1)
221
222        self._frames.append({
223            "patch_count": len(patches),
224            "ref_frame_id": ref_id,
225            "metadata_len": len(meta_bytes),
226            "frame_type": frame_type,
227        })
228        self._frame_data.append(frame_bytes)
229        self._ref_images.append(image)
230        self._ref_arrays.append(new_arr)
231        size_key = (image.width, image.height)
232        self._size_groups.setdefault(size_key, []).append(frame_id)
233
234        return frame_id - self._baseline_count
235
236    def close(self):
237        """Finalise and write the file.  Called automatically by __exit__."""
238        if self._closed:
239            return
240        if not self._frames:
241            raise RuntimeError("No frames added — nothing to write")
242
243        self._pool.shutdown(wait=False)
244
245        w, h = self._canvas
246        frame_count = len(self._frames)
247
248        index_offset = FILE_HEADER_SIZE
249        data_start = index_offset + frame_count * FRAME_INDEX_ENTRY_SIZE
250        offsets: list[int] = []
251        cursor = data_start
252        for fd in self._frame_data:
253            offsets.append(cursor)
254            cursor += len(fd)
255
256        self.path.parent.mkdir(parents=True, exist_ok=True)
257
258        with open(self.path, "wb") as f:
259            f.write(pack_file_header(
260                self.mode_id, frame_count, index_offset, w, h,
261                codec=CODEC_FROM_NAME[self.codec],
262                quality=self.quality,
263            ))
264            for i, fm in enumerate(self._frames):
265                f.write(pack_index_entry(
266                    data_offset=offsets[i],
267                    patch_count=fm["patch_count"],
268                    ref_frame_id=fm["ref_frame_id"],
269                    metadata_len=fm["metadata_len"],
270                    frame_type=fm["frame_type"],
271                ))
272            for fd in self._frame_data:
273                f.write(fd)
274
275        self._closed = True
276
277    def __enter__(self):
278        return self
279
280    def __exit__(self, exc_type, exc_val, exc_tb):
281        if exc_type is None:
282            self.close()
283        else:
284            self._pool.shutdown(wait=False)
285            self._closed = True
286        return False
287
288    @property
289    def frame_count(self) -> int:
290        """Total frames including hidden baselines."""
291        return len(self._frames)
292
293    @property
294    def content_frame_count(self) -> int:
295        """Content frames only (what callers see)."""
296        return len(self._frames) - self._baseline_count
297
298    @property
299    def baseline_count(self) -> int:
300        return self._baseline_count
301
302    @property
303    def stats(self) -> list[dict]:
304        """Per-frame stats including hidden baseline frames."""
305        result = []
306        for i, (fm, fd) in enumerate(zip(self._frames, self._frame_data)):
307            is_baseline = i in self._baseline_ids
308            result.append({
309                "frame_id": i,
310                "content_id": None if is_baseline else i - self._baseline_count,
311                "frame_type": "keyframe" if fm["frame_type"] == FRAME_KEYFRAME else "delta",
312                "ref_frame_id": fm["ref_frame_id"],
313                "patch_count": fm["patch_count"],
314                "data_bytes": len(fd),
315                "is_baseline": is_baseline,
316            })
317        return result
318
319    def _inject_baselines(self, baselines: list):
320        """
321        Store each baseline as a hidden leading FRAME_KEYFRAME.
322
323        Called from the first add() so self._canvas is already set.
324        Baselines are resized to the canvas if their native size differs.
325        """
326        for idx, raw in enumerate(baselines):
327            if isinstance(raw, (str, Path)):
328                img = Image.open(raw).convert("RGBA")
329            else:
330                img = raw.convert("RGBA")
331
332            # LANCZOS? not sure..
333            if img.size != self._canvas:
334                img = img.resize(self._canvas, Image.LANCZOS)
335
336            frame_id = len(self._frames)
337            cw, ch = self._canvas
338            compressed = _encode_crop(img, 0, 0, cw, ch, codec=self.codec, quality=self.quality)
339
340            meta_bytes = json.dumps(
341                {"_baseline": True, "baseline_index": idx},
342                separators=(",", ":"),
343            ).encode()
344
345            parts: list[bytes] = [
346                pack_patch_header(0, 0, cw, ch, len(compressed)),
347                compressed,
348                meta_bytes,
349            ]
350            frame_bytes = b"".join(parts)
351
352            self._depths.append(0)
353            self._frames.append({
354                "patch_count": 1,
355                "ref_frame_id": frame_id,
356                "metadata_len": len(meta_bytes),
357                "frame_type": FRAME_KEYFRAME,
358            })
359            self._frame_data.append(frame_bytes)
360            self._ref_images.append(img)
361            self._ref_arrays.append(np.array(img, dtype=np.int16))
362            self._baseline_ids.append(frame_id)
363
364        self._baseline_count = len(self._baseline_ids)
365
366    def _encode_frame(self, image: Image.Image, frame_id: int, new_arr: np.ndarray):
367        """Route to the appropriate encoder. Returns (frame_type, ref_id, patches)."""
368        size_key = (image.width, image.height)
369        same_size_ids = [fid for fid in self._size_groups.get(size_key, [])
370                         if fid < frame_id]
371        if not same_size_ids and frame_id > 0:
372            return self._make_keyframe(image, frame_id)
373
374        if self.mode_id == MODE_MANUAL:
375            return self._encode_frame_manual(image, frame_id, new_arr)
376
377        if frame_id == 0:
378            return self._make_keyframe(image, frame_id)
379
380        if self.mode_id == MODE_LTO:
381            return self._encode_frame_lto(image, frame_id, new_arr, same_size_ids)
382
383        if self.mode_id == MODE_KEYFRAME and (frame_id % self.keyframe_interval == 0):
384            return self._make_keyframe(image, frame_id)
385
386        if self.mode_id == MODE_VS_FIRST:
387            ref_id = same_size_ids[0]
388        elif self.mode_id == MODE_VS_PRIOR:
389            ref_id = same_size_ids[-1]
390        else:
391            ref_id = same_size_ids[-1]
392
393        return self._diff_against(image, frame_id, ref_id, new_arr)
394
395    def _ref_chain_depth(self, frame_id: int) -> int:
396        return self._depths[frame_id]
397
398    def _make_keyframe(self, image: Image.Image, frame_id: int):
399        w, h = image.size
400        compressed = _encode_crop(image, 0, 0, w, h, codec=self.codec, quality=self.quality)
401        return FRAME_KEYFRAME, frame_id, [(0, 0, w, h, compressed)]
402
403    def _diff_against(self, image: Image.Image, frame_id: int, ref_id: int, new_arr: np.ndarray):
404        if self.auto_keyframe_sim > 0:
405            ref_arr = self._ref_arrays[ref_id]
406            diff_arr = np.abs(new_arr - ref_arr).max(axis=2)
407            total = ref_arr.shape[0] * ref_arr.shape[1]
408            if (diff_arr <= self.threshold).sum() / total < self.auto_keyframe_sim:
409                return self._make_keyframe(image, frame_id)
410
411        patches = compute_patches(
412            self._ref_images[ref_id], image,
413            threshold=self.threshold,
414            tile_size=self.tile_size,
415            merge_gap=self.merge_gap,
416            codec=self.codec,
417            quality=self.quality,
418            workers=self.workers,
419            ref_arr=self._ref_arrays[ref_id],
420            new_arr=new_arr,
421        )
422        return FRAME_DELTA, ref_id, patches
423
424    def _encode_frame_lto(self, image: Image.Image, frame_id: int,
425                          new_arr: np.ndarray,
426                          same_size_ids: Optional[list] = None):
427        total_px = new_arr.shape[0] * new_arr.shape[1]
428
429        if same_size_ids is None:
430            size_key = (image.width, image.height)
431            same_size_ids = [fid for fid in self._size_groups.get(size_key, [])
432                             if fid < frame_id]
433
434        eligible = [
435            cid for cid in same_size_ids
436            if self._depths[cid] + 1 <= self.max_ref_depth
437        ]
438        if not eligible:
439            return self._make_keyframe(image, frame_id)
440
441        def _score(cid: int):
442            diff = np.abs(new_arr - self._ref_arrays[cid]).max(axis=2)
443            sim = float((diff <= self.threshold).sum()) / total_px
444            return sim, cid
445
446        if len(eligible) <= 2 or self.workers == 1:
447            scored = [_score(cid) for cid in eligible]
448        else:
449            scored = list(self._pool.map(_score, eligible))
450
451        scored.sort(key=lambda x: -x[0])
452        candidates = [cid for (_, cid) in scored[: self.lto_candidates]]
453
454        def _probe(cid: int):
455            patches = compute_patches(
456                self._ref_images[cid], image,
457                threshold=self.threshold,
458                tile_size=self.tile_size,
459                merge_gap=self.merge_gap,
460                codec=self.codec,
461                quality=self.quality,
462                workers=1,
463                ref_arr=self._ref_arrays[cid],
464                new_arr=new_arr,
465            )
466            return cid, patches, sum(len(p[4]) for p in patches)
467
468        if len(candidates) <= 1 or self.workers == 1:
469            results = [_probe(cid) for cid in candidates]
470        else:
471            results = list(self._pool.map(_probe, candidates))
472
473        best_ref_id, best_patches, best_size = min(results, key=lambda r: r[2])
474
475        iw, ih = image.size
476        kf_data = _encode_crop(image, 0, 0, iw, ih, codec=self.codec, quality=self.quality)
477        if len(kf_data) <= best_size:
478            return self._make_keyframe(image, frame_id)
479
480        return FRAME_DELTA, best_ref_id, best_patches
481
482    def _encode_frame_manual(self, image: Image.Image, frame_id: int, new_arr: np.ndarray):
483        """
484        Manual-mode encoder: probe designated baseline frames first, then fall
485        back to the configured fallback mode if no baseline wins.
486        """
487        total_px = new_arr.shape[0] * new_arr.shape[1]
488
489        def _score_baseline(bid: int):
490            diff = np.abs(new_arr - self._ref_arrays[bid]).max(axis=2)
491            sim = float((diff <= self.threshold).sum()) / total_px
492            return sim, bid
493
494        if len(self._baseline_ids) <= 2 or self.workers == 1:
495            scored = [_score_baseline(bid) for bid in self._baseline_ids]
496        else:
497            scored = list(self._pool.map(_score_baseline, self._baseline_ids))
498
499        scored.sort(key=lambda x: -x[0])
500        top_baselines = [bid for (_, bid) in scored[: self.lto_candidates]]
501
502        def _probe_baseline(bid: int):
503            patches = compute_patches(
504                self._ref_images[bid], image,
505                threshold=self.threshold,
506                tile_size=self.tile_size,
507                merge_gap=self.merge_gap,
508                codec=self.codec,
509                quality=self.quality,
510                workers=1,
511                ref_arr=self._ref_arrays[bid],
512                new_arr=new_arr,
513            )
514            return bid, patches, sum(len(p[4]) for p in patches)
515
516        if len(top_baselines) <= 1 or self.workers == 1:
517            baseline_results = [_probe_baseline(bid) for bid in top_baselines]
518        else:
519            baseline_results = list(self._pool.map(_probe_baseline, top_baselines))
520
521        best_bl_ref, best_bl_patches, best_bl_size = min(baseline_results, key=lambda r: r[2])
522
523        fb_type, fb_ref_id, fb_patches = self._encode_frame_fallback(image, frame_id, new_arr)
524        fb_size = sum(len(p[4]) for p in fb_patches)
525
526        iw, ih = image.size
527        kf_size = len(_encode_crop(image, 0, 0, iw, ih, codec=self.codec, quality=self.quality))
528
529        if kf_size <= best_bl_size and kf_size <= fb_size:
530            return self._make_keyframe(image, frame_id)
531        if best_bl_size <= fb_size:
532            return FRAME_DELTA, best_bl_ref, best_bl_patches
533        return fb_type, fb_ref_id, fb_patches
534
535    def _encode_frame_fallback(self, image: Image.Image, frame_id: int, new_arr: np.ndarray):
536        """
537        Run the configured fallback_mode for this frame.
538        """
539        content_id = frame_id - self._baseline_count
540
541        if self.fallback_mode_id == MODE_VS_FIRST:
542            if content_id == 0:
543                return self._make_keyframe(image, frame_id)
544            ref_id = self._baseline_count
545            return self._diff_against(image, frame_id, ref_id, new_arr)
546
547        if self.fallback_mode_id == MODE_VS_PRIOR:
548            if content_id == 0:
549                return self._make_keyframe(image, frame_id)
550            return self._diff_against(image, frame_id, frame_id - 1, new_arr)
551
552        if self.fallback_mode_id == MODE_KEYFRAME:
553            if content_id == 0 or (content_id % self.keyframe_interval == 0):
554                return self._make_keyframe(image, frame_id)
555            return self._diff_against(image, frame_id, frame_id - 1, new_arr)
556
557        if self.fallback_mode_id == MODE_LTO:
558            if content_id == 0:
559                return self._make_keyframe(image, frame_id)
560            return self._encode_frame_lto(image, frame_id, new_arr)
561
562        return self._make_keyframe(image, frame_id)

Sequential writer for .impak collections.

Parameters
  • path (output file path):

  • mode ("vs_first" | "vs_prior" | "keyframe" | "lto" | "manual"):

  • keyframe_interval ((keyframe mode only) store a full image every N frames):

  • threshold (pixel delta considered "changed" (0 = perfectly lossless)):

  • tile_size (diff grid cell size in pixels):

  • merge_gap (pixels gap between changed tiles before they are merged):

  • auto_keyframe_sim (if similarity drops below this fraction a keyframe is): forced regardless of mode (0.0 = never force)

  • workers (thread-pool size used for parallel patch compression and): LTO candidate probing. None = os.cpu_count(). Set to 1 to disable parallelism entirely.
  • baselines ((manual mode) list of PIL Images or file paths used as): pinned reference anchors. They are stored as hidden leading keyframes so the delta chain is self-contained, but ImpakReader hides them from callers — only content frames are visible when iterating or indexing. Baselines are automatically resized to match the canvas size established by the first content frame (or the first baseline if all are the same size).
  • fallback_mode ((manual mode) diff strategy applied when no baseline): yields a patch set smaller than this alternative. Accepts "vs_first", "vs_prior", "keyframe", or "lto". Defaults to "lto".
ImpakWriter( path: Union[str, pathlib._local.Path], mode: str = 'vs_first', keyframe_interval: int = 10, threshold: int = 4, tile_size: int = 64, merge_gap: int = 8, auto_keyframe_sim: float = 0.5, codec: str = 'webp', quality: int = 100, lto_candidates: int = 6, max_ref_depth: int = 8, workers: Optional[int] = None, baselines: Optional[List[Union[PIL.Image.Image, str, pathlib._local.Path]]] = None, fallback_mode: str = 'lto')
 94    def __init__(
 95        self,
 96        path: Union[str, Path],
 97        mode: str = "vs_first",
 98        keyframe_interval: int = 10,
 99        threshold: int = 4,
100        tile_size: int = 64,
101        merge_gap: int = 8,
102        auto_keyframe_sim: float = 0.5,
103        codec: str = "webp",
104        quality: int = 100,
105        lto_candidates: int = 6,
106        max_ref_depth: int = 8,
107        workers: Optional[int] = None,
108        baselines: Optional[List[Union[Image.Image, str, Path]]] = None,
109        fallback_mode: str = "lto",
110    ):
111        if mode not in MODE_FROM_NAME:
112            raise ValueError(f"mode must be one of {list(MODE_FROM_NAME)}")
113        if codec not in ("png", "webp"):
114            raise ValueError("codec must be 'png' or 'webp'")
115        if not (0 <= quality <= 100):
116            raise ValueError("quality must be 0-100")
117        if lto_candidates < 1:
118            raise ValueError("lto_candidates must be >= 1")
119        if max_ref_depth < 1:
120            raise ValueError("max_ref_depth must be >= 1")
121
122        if mode == "manual":
123            _valid_fallbacks = [m for m in MODE_FROM_NAME if m != "manual"]
124            if fallback_mode not in _valid_fallbacks:
125                raise ValueError(
126                    f"fallback_mode must be one of {_valid_fallbacks}"
127                )
128            if not baselines:
129                raise ValueError(
130                    "manual mode requires at least one entry in 'baselines'"
131                )
132
133        self.path = Path(path)
134        self.mode = mode
135        self.mode_id = MODE_FROM_NAME[mode]
136        self.keyframe_interval = keyframe_interval
137        self.threshold = threshold
138        self.tile_size = tile_size
139        self.merge_gap = merge_gap
140        self.auto_keyframe_sim = auto_keyframe_sim
141        self.codec = codec
142        self.quality = quality
143        self.lto_candidates = lto_candidates
144        self.max_ref_depth = max_ref_depth
145        self.workers = workers
146        self.fallback_mode = fallback_mode
147        self.fallback_mode_id = MODE_FROM_NAME.get(fallback_mode, MODE_LTO)
148
149        self._frames: list[dict] = []
150        self._frame_data: list[bytes] = []
151        self._ref_images: list[Image.Image] = []
152        self._ref_arrays: list[np.ndarray] = []
153        self._depths: list[int] = []
154
155
156        self._canvas: Optional[tuple[int, int]] = None
157        self._size_groups: dict[tuple[int, int], list[int]] = {}
158        self._closed = False
159
160        self._baseline_ids: list[int] = []
161        self._baseline_count: int = 0
162        self._pending_baselines: list = list(baselines) if baselines else []
163
164        _pool_size = workers if workers is not None else (os.cpu_count() or 4)
165        self._pool = ThreadPoolExecutor(max_workers=max(1, _pool_size))
path
mode
mode_id
keyframe_interval
threshold
tile_size
merge_gap
auto_keyframe_sim
codec
quality
lto_candidates
max_ref_depth
workers
fallback_mode
fallback_mode_id
def add( self, image: Union[PIL.Image.Image, str, pathlib._local.Path], name: Optional[str] = None, metadata: Optional[dict] = None) -> int:
167    def add(
168        self,
169        image: Union[Image.Image, str, Path],
170        name: Optional[str] = None,
171        metadata: Optional[dict] = None,
172    ) -> int:
173        """
174        Add one content image to the collection.  Returns the 0-based frame
175        index as seen by readers (i.e. not counting hidden baseline frames).
176
177        *image* may be a Pillow Image or a file path.
178        *name* is stored in per-frame metadata (useful for later retrieval).
179        *metadata* is an arbitrary JSON-serialisable dict merged with name.
180        """
181        if self._closed:
182            raise RuntimeError("Writer is already closed")
183
184        if isinstance(image, (str, Path)):
185            image = Image.open(image)
186
187        image = image.convert("RGBA")
188        new_arr = np.array(image, dtype=np.int16)
189
190        if self._canvas is None:
191            self._canvas = (image.width, image.height)
192
193        if self._pending_baselines:
194            self._inject_baselines(self._pending_baselines)
195            self._pending_baselines = []
196
197        frame_id = len(self._frames)
198
199        frame_type, ref_id, patches = self._encode_frame(image, frame_id, new_arr)
200
201        meta: dict = {}
202        if name:
203            meta["name"] = name
204        if metadata:
205            meta.update(metadata)
206
207        meta["_size"] = [image.width, image.height]
208        meta_bytes = json.dumps(meta, separators=(",", ":")).encode() if meta else b""
209
210        parts: list[bytes] = []
211        for (x, y, w, h, data) in patches:
212            parts.append(pack_patch_header(x, y, w, h, len(data)))
213            parts.append(data)
214        parts.append(meta_bytes)
215        frame_bytes = b"".join(parts)
216
217        if frame_type == FRAME_KEYFRAME:
218            self._depths.append(0)
219        else:
220            self._depths.append(self._depths[ref_id] + 1)
221
222        self._frames.append({
223            "patch_count": len(patches),
224            "ref_frame_id": ref_id,
225            "metadata_len": len(meta_bytes),
226            "frame_type": frame_type,
227        })
228        self._frame_data.append(frame_bytes)
229        self._ref_images.append(image)
230        self._ref_arrays.append(new_arr)
231        size_key = (image.width, image.height)
232        self._size_groups.setdefault(size_key, []).append(frame_id)
233
234        return frame_id - self._baseline_count

Add one content image to the collection. Returns the 0-based frame index as seen by readers (i.e. not counting hidden baseline frames).

image may be a Pillow Image or a file path. name is stored in per-frame metadata (useful for later retrieval). metadata is an arbitrary JSON-serialisable dict merged with name.

def close(self):
236    def close(self):
237        """Finalise and write the file.  Called automatically by __exit__."""
238        if self._closed:
239            return
240        if not self._frames:
241            raise RuntimeError("No frames added — nothing to write")
242
243        self._pool.shutdown(wait=False)
244
245        w, h = self._canvas
246        frame_count = len(self._frames)
247
248        index_offset = FILE_HEADER_SIZE
249        data_start = index_offset + frame_count * FRAME_INDEX_ENTRY_SIZE
250        offsets: list[int] = []
251        cursor = data_start
252        for fd in self._frame_data:
253            offsets.append(cursor)
254            cursor += len(fd)
255
256        self.path.parent.mkdir(parents=True, exist_ok=True)
257
258        with open(self.path, "wb") as f:
259            f.write(pack_file_header(
260                self.mode_id, frame_count, index_offset, w, h,
261                codec=CODEC_FROM_NAME[self.codec],
262                quality=self.quality,
263            ))
264            for i, fm in enumerate(self._frames):
265                f.write(pack_index_entry(
266                    data_offset=offsets[i],
267                    patch_count=fm["patch_count"],
268                    ref_frame_id=fm["ref_frame_id"],
269                    metadata_len=fm["metadata_len"],
270                    frame_type=fm["frame_type"],
271                ))
272            for fd in self._frame_data:
273                f.write(fd)
274
275        self._closed = True

Finalise and write the file. Called automatically by __exit__.

frame_count: int
288    @property
289    def frame_count(self) -> int:
290        """Total frames including hidden baselines."""
291        return len(self._frames)

Total frames including hidden baselines.

content_frame_count: int
293    @property
294    def content_frame_count(self) -> int:
295        """Content frames only (what callers see)."""
296        return len(self._frames) - self._baseline_count

Content frames only (what callers see).

baseline_count: int
298    @property
299    def baseline_count(self) -> int:
300        return self._baseline_count
stats: list[dict]
302    @property
303    def stats(self) -> list[dict]:
304        """Per-frame stats including hidden baseline frames."""
305        result = []
306        for i, (fm, fd) in enumerate(zip(self._frames, self._frame_data)):
307            is_baseline = i in self._baseline_ids
308            result.append({
309                "frame_id": i,
310                "content_id": None if is_baseline else i - self._baseline_count,
311                "frame_type": "keyframe" if fm["frame_type"] == FRAME_KEYFRAME else "delta",
312                "ref_frame_id": fm["ref_frame_id"],
313                "patch_count": fm["patch_count"],
314                "data_bytes": len(fd),
315                "is_baseline": is_baseline,
316            })
317        return result

Per-frame stats including hidden baseline frames.

class ImpakReader:
 49class ImpakReader:
 50    """
 51    Random-access reader for .impak collections.
 52
 53    The file index is read at open time (small); pixel data is lazy-loaded
 54    per frame request.
 55
 56    In manual-mode files the leading baseline keyframes are automatically
 57    detected (via their {"_baseline": true} metadata) and excluded from all
 58    public-facing operations.  They are still used internally when decoding
 59    delta frames that reference them.
 60    """
 61
 62    def __init__(
 63            self,
 64            path: Union[str, Path],
 65            low_ram_mode: bool = False,
 66            cache_size: Optional[int] = None,
 67    ):
 68        self.path = Path(path)
 69        self._fh = open(self.path, "rb")
 70        self._header: dict = {}
 71        self._index: list[dict] = []
 72        self._name_map: dict[str, int] = {}  # name → content_id
 73        self._meta_cache: dict[int, dict] = {}
 74
 75        self.low_ram_mode = low_ram_mode
 76        if cache_size is None:
 77            cache_size = 2 if low_ram_mode else 0
 78        self.cache_size = max(0, cache_size)
 79
 80        # abs_id -> Image.Image
 81        # normal mode: unlimited dict-like cache
 82        # low_ram_mode: bounded LRU cache
 83        self._decode_cache: "OrderedDict[int, Image.Image]" = OrderedDict()
 84
 85        self._read_header()
 86        self._read_index()
 87
 88        self._baseline_count: int = self._detect_baseline_count()
 89
 90    def __enter__(self):
 91        return self
 92
 93    def __exit__(self, *_):
 94        self.close()
 95
 96    def close(self):
 97        if self._fh and not self._fh.closed:
 98            self._fh.close()
 99
100    def __len__(self) -> int:
101        """Number of content frames (baseline frames are not counted)."""
102        return self._header["frame_count"] - self._baseline_count
103
104    def __getitem__(self, key: Union[int, str]) -> Image.Image:
105        """
106        Retrieve a content frame by index or name.
107
108        Integer indices are relative to content frames only (0 = first content
109        frame, regardless of how many baselines precede it).
110        """
111        if isinstance(key, str):
112            self._build_name_map()
113            if key not in self._name_map:
114                raise KeyError(f"Frame name '{key}' not found in collection")
115            abs_id = self._name_map[key]
116            return self._decode_frame(abs_id)
117
118        if key < 0:
119            key = len(self) + key
120        if not (0 <= key < len(self)):
121            raise IndexError(f"Frame index {key} out of range (0..{len(self)-1})")
122        return self._decode_frame(self._content_to_abs(key))
123
124    def __iter__(self) -> Iterator[Image.Image]:
125        for i in range(len(self)):
126            yield self._decode_frame(self._content_to_abs(i))
127
128    def _cache_get(self, abs_id: int) -> Optional[Image.Image]:
129        img = self._decode_cache.get(abs_id)
130        if img is None:
131            return None
132        # LRU touch
133        self._decode_cache.move_to_end(abs_id)
134        return img.copy()
135
136    def _cache_put(self, abs_id: int, img: Image.Image) -> None:
137        if self.cache_size == 0:
138            return
139
140        self._decode_cache[abs_id] = img.copy()
141        self._decode_cache.move_to_end(abs_id)
142
143        if self.low_ram_mode:
144            while len(self._decode_cache) > self.cache_size:
145                self._decode_cache.popitem(last=False)
146
147    def get_metadata(self, frame_id: int) -> dict:
148        """
149        Return the metadata dict for a content frame by content index.
150        """
151        return self._read_metadata(self._content_to_abs(frame_id))
152
153    def info(self) -> str:
154        """Human-readable summary of the collection."""
155        h = self._header
156        mode_name = MODE_NAMES.get(h["diff_mode"], "unknown")
157        codec_name = CODEC_NAMES.get(h.get("codec", 0), "png")
158        quality = h.get("quality", 100)
159        quality_str = "lossless" if quality == 100 else f"quality={quality}"
160        total_bytes = self.path.stat().st_size
161        n_content = len(self)
162        lines = [
163            f"File    : {self.path}",
164            f"Format  : impak v{h['version']}",
165            f"Diff    : {mode_name}",
166            f"Codec   : {codec_name} ({quality_str})",
167            f"Canvas  : {h['width']} × {h['height']} px",
168            f"Frames  : {n_content}"
169            + (f"  ({self._baseline_count} hidden baseline(s))" if self._baseline_count else ""),
170            f"Size    : {total_bytes:,} bytes ({total_bytes/1024:.1f} KB)",
171            "",
172        ]
173
174        frame_sizes = {self._frame_size(self._content_to_abs(i)) for i in range(n_content)}
175        mixed_sizes = len(frame_sizes) > 1
176
177        if mixed_sizes:
178            lines += [
179                f"{'ID':>4}  {'Type':>9}  {'Ref':>4}  {'Patches':>7}  {'Bytes':>9}  {'Size':>11}  Name",
180                "─" * 72,
181            ]
182        else:
183            lines += [
184                f"{'ID':>4}  {'Type':>9}  {'Ref':>4}  {'Patches':>7}  {'Bytes':>9}  Name",
185                "─" * 64,
186            ]
187
188        for content_id in range(n_content):
189            abs_id = self._content_to_abs(content_id)
190            entry = self._index[abs_id]
191            ftype = "keyframe" if entry["frame_type"] == FRAME_KEYFRAME else "delta"
192            ref_abs = entry["ref_frame_id"]
193            if entry["frame_type"] == FRAME_DELTA:
194                if ref_abs < self._baseline_count:
195                    ref = f"B{ref_abs}"
196                else:
197                    ref = str(ref_abs - self._baseline_count)
198            else:
199                ref = "self"
200            data_bytes = self._frame_data_size(abs_id)
201            meta = self._read_metadata(abs_id)
202            name = meta.get("name", "")
203            if mixed_sizes:
204                fw, fh = self._frame_size(abs_id)
205                size_str = f"{fw}×{fh}"
206                lines.append(
207                    f"{content_id:>4}  {ftype:>9}  {ref:>4}  "
208                    f"{entry['patch_count']:>7}  {data_bytes:>9,}  {size_str:>11}  {name}"
209                )
210            else:
211                lines.append(
212                    f"{content_id:>4}  {ftype:>9}  {ref:>4}  "
213                    f"{entry['patch_count']:>7}  {data_bytes:>9,}  {name}"
214                )
215        return "\n".join(lines)
216
217    def diff_map(self, frame_id: int) -> list[tuple[int, int, int, int]]:
218        """
219        Return the (x, y, w, h) patch rectangles for a content frame.
220        *frame_id* is a content-visible index.
221        """
222        abs_id = self._content_to_abs(frame_id)
223        entry = self._index[abs_id]
224        if entry["frame_type"] == FRAME_KEYFRAME:
225            w, h = self._frame_size(abs_id)
226            return [(0, 0, w, h)]
227        patches = self._read_patches(abs_id)
228        return [(x, y, w, h) for (x, y, w, h, _) in patches]
229
230    @property
231    def canvas_size(self) -> tuple[int, int]:
232        return self._header["width"], self._header["height"]
233
234    @property
235    def mode(self) -> str:
236        return MODE_NAMES.get(self._header["diff_mode"], "unknown")
237
238    @property
239    def baseline_count(self) -> int:
240        """Number of hidden baseline frames (0 for non-manual files)."""
241        return self._baseline_count
242
243    def _frame_size(self, abs_id: int) -> tuple[int, int]:
244        """
245        Return (width, height) for a frame.
246
247        For files written with multi-size support the per-frame size is stored
248        in JSON metadata under "_size".  For older single-size files (or frames
249        that pre-date the feature) we fall back to the global canvas dimensions.
250        """
251        meta = self._read_metadata(abs_id)
252        if "_size" in meta:
253            return tuple(meta["_size"])
254        return self._header["width"], self._header["height"]
255
256    def _content_to_abs(self, content_id: int) -> int:
257        """Convert a public content index to an absolute frame index."""
258        return content_id + self._baseline_count
259
260    def _decode_frame(self, abs_id: int, _memo: Optional[dict[int, Image.Image]] = None) -> Image.Image:
261        """Decode frame by absolute index (works for baselines too)."""
262        if _memo is None:
263            _memo = {}
264
265        if abs_id in _memo:
266            return _memo[abs_id].copy()
267
268        cached = self._cache_get(abs_id)
269        if cached is not None:
270            _memo[abs_id] = cached
271            return cached.copy()
272
273        entry = self._index[abs_id]
274        codec = self.codec
275
276        if entry["frame_type"] == FRAME_KEYFRAME:
277            patches = self._read_patches(abs_id)
278            assert len(patches) == 1
279            img = self._decompress_patch_to_image(patches[0][4], codec)
280        else:
281            ref_id = entry["ref_frame_id"]
282            ref_img = self._decode_frame(ref_id, _memo=_memo)
283            patches = self._read_patches(abs_id)
284            img = reconstruct(ref_img, patches, codec=codec)
285
286        img = img.convert("RGBA")
287        _memo[abs_id] = img
288        self._cache_put(abs_id, img)
289        return img.copy()
290
291    def _read_header(self):
292        self._fh.seek(0)
293        raw = self._fh.read(FILE_HEADER_SIZE)
294        if len(raw) < FILE_HEADER_SIZE:
295            raise ValueError("File too small to be a valid .impak archive")
296        h = unpack_file_header(raw)
297        if h["magic"] != MAGIC:
298            raise ValueError("Not a valid .impak file (bad magic bytes)")
299        self._header = h
300
301    def _read_index(self):
302        self._fh.seek(self._header["index_offset"])
303        count = self._header["frame_count"]
304        self._index = []
305        for _ in range(count):
306            raw = self._fh.read(FRAME_INDEX_ENTRY_SIZE)
307            self._index.append(unpack_index_entry(raw))
308
309    def _read_patches(self, abs_id: int) -> list:
310        entry = self._index[abs_id]
311        self._fh.seek(entry["data_offset"])
312        patches = []
313        for _ in range(entry["patch_count"]):
314            hdr = self._fh.read(PATCH_HEADER_SIZE)
315            x, y, w, h, data_len = unpack_patch_header(hdr)
316            data = self._fh.read(data_len)
317            patches.append((x, y, w, h, data))
318        return patches
319
320    def _read_metadata(self, abs_id: int) -> dict:
321        if abs_id in self._meta_cache:
322            return self._meta_cache[abs_id]
323        entry = self._index[abs_id]
324        meta_len = entry["metadata_len"]
325        if meta_len == 0:
326            self._meta_cache[abs_id] = {}
327            return {}
328        self._fh.seek(entry["data_offset"])
329        for _ in range(entry["patch_count"]):
330            hdr = self._fh.read(PATCH_HEADER_SIZE)
331            _, _, _, _, data_len = unpack_patch_header(hdr)
332            self._fh.seek(data_len, 1)
333        raw = self._fh.read(meta_len)
334        try:
335            meta = json.loads(raw.decode())
336        except Exception:
337            meta = {}
338        self._meta_cache[abs_id] = meta
339        return meta
340
341    def _frame_data_size(self, abs_id: int) -> int:
342        entry = self._index[abs_id]
343        total = self._header["frame_count"]
344        if abs_id + 1 < total:
345            next_off = self._index[abs_id + 1]["data_offset"]
346        else:
347            next_off = self.path.stat().st_size
348        return next_off - entry["data_offset"]
349
350    def _decompress_patch_to_image(self, compressed: bytes, codec: str = "png") -> Image.Image:
351        from .differ import _decode_patch
352        return _decode_patch(compressed, codec=codec)
353
354    def _detect_baseline_count(self) -> int:
355        """
356        Scan leading frames for the {"_baseline": true} metadata tag.
357        Returns the count of consecutive baseline frames at the front.
358        This is how the reader discovers how many frames to hide, without
359        any binary format change.
360        """
361        count = 0
362        total = self._header["frame_count"]
363        for abs_id in range(total):
364            meta = self._read_metadata(abs_id)
365            if meta.get("_baseline"):
366                count += 1
367            else:
368                break   # baselines are always contiguous at the front
369        return count
370
371    @property
372    def codec(self) -> str:
373        return CODEC_NAMES.get(self._header.get("codec", 0), "png")
374
375    @property
376    def quality(self) -> int:
377        return self._header.get("quality", 100)
378
379    def _build_name_map(self):
380        if self._name_map:
381            return
382        for content_id in range(len(self)):
383            abs_id = self._content_to_abs(content_id)
384            meta = self._read_metadata(abs_id)
385            if "name" in meta:
386                self._name_map[meta["name"]] = abs_id
387
388    @classmethod
389    def load_all(
390            cls,
391            path: Union[str, Path],
392            low_ram_mode: bool = False,
393            cache_size: Optional[int] = None,
394    ) -> List[Image.Image]:
395        """Load every content frame and return as a list of PIL Images."""
396        with cls(path, low_ram_mode=low_ram_mode, cache_size=cache_size) as r:
397            return list(r)
398
399    @classmethod
400    def load_frame(
401            cls,
402            path: Union[str, Path],
403            frame_id: int,
404            low_ram_mode: bool = False,
405            cache_size: Optional[int] = None,
406    ) -> Image.Image:
407        """Load a single content frame by content index."""
408        with cls(path, low_ram_mode=low_ram_mode, cache_size=cache_size) as r:
409            return r[frame_id]

Random-access reader for .impak collections.

The file index is read at open time (small); pixel data is lazy-loaded per frame request.

In manual-mode files the leading baseline keyframes are automatically detected (via their {"_baseline": true} metadata) and excluded from all public-facing operations. They are still used internally when decoding delta frames that reference them.

ImpakReader( path: Union[str, pathlib._local.Path], low_ram_mode: bool = False, cache_size: Optional[int] = None)
62    def __init__(
63            self,
64            path: Union[str, Path],
65            low_ram_mode: bool = False,
66            cache_size: Optional[int] = None,
67    ):
68        self.path = Path(path)
69        self._fh = open(self.path, "rb")
70        self._header: dict = {}
71        self._index: list[dict] = []
72        self._name_map: dict[str, int] = {}  # name → content_id
73        self._meta_cache: dict[int, dict] = {}
74
75        self.low_ram_mode = low_ram_mode
76        if cache_size is None:
77            cache_size = 2 if low_ram_mode else 0
78        self.cache_size = max(0, cache_size)
79
80        # abs_id -> Image.Image
81        # normal mode: unlimited dict-like cache
82        # low_ram_mode: bounded LRU cache
83        self._decode_cache: "OrderedDict[int, Image.Image]" = OrderedDict()
84
85        self._read_header()
86        self._read_index()
87
88        self._baseline_count: int = self._detect_baseline_count()
path
low_ram_mode
cache_size
def close(self):
96    def close(self):
97        if self._fh and not self._fh.closed:
98            self._fh.close()
def get_metadata(self, frame_id: int) -> dict:
147    def get_metadata(self, frame_id: int) -> dict:
148        """
149        Return the metadata dict for a content frame by content index.
150        """
151        return self._read_metadata(self._content_to_abs(frame_id))

Return the metadata dict for a content frame by content index.

def info(self) -> str:
153    def info(self) -> str:
154        """Human-readable summary of the collection."""
155        h = self._header
156        mode_name = MODE_NAMES.get(h["diff_mode"], "unknown")
157        codec_name = CODEC_NAMES.get(h.get("codec", 0), "png")
158        quality = h.get("quality", 100)
159        quality_str = "lossless" if quality == 100 else f"quality={quality}"
160        total_bytes = self.path.stat().st_size
161        n_content = len(self)
162        lines = [
163            f"File    : {self.path}",
164            f"Format  : impak v{h['version']}",
165            f"Diff    : {mode_name}",
166            f"Codec   : {codec_name} ({quality_str})",
167            f"Canvas  : {h['width']} × {h['height']} px",
168            f"Frames  : {n_content}"
169            + (f"  ({self._baseline_count} hidden baseline(s))" if self._baseline_count else ""),
170            f"Size    : {total_bytes:,} bytes ({total_bytes/1024:.1f} KB)",
171            "",
172        ]
173
174        frame_sizes = {self._frame_size(self._content_to_abs(i)) for i in range(n_content)}
175        mixed_sizes = len(frame_sizes) > 1
176
177        if mixed_sizes:
178            lines += [
179                f"{'ID':>4}  {'Type':>9}  {'Ref':>4}  {'Patches':>7}  {'Bytes':>9}  {'Size':>11}  Name",
180                "─" * 72,
181            ]
182        else:
183            lines += [
184                f"{'ID':>4}  {'Type':>9}  {'Ref':>4}  {'Patches':>7}  {'Bytes':>9}  Name",
185                "─" * 64,
186            ]
187
188        for content_id in range(n_content):
189            abs_id = self._content_to_abs(content_id)
190            entry = self._index[abs_id]
191            ftype = "keyframe" if entry["frame_type"] == FRAME_KEYFRAME else "delta"
192            ref_abs = entry["ref_frame_id"]
193            if entry["frame_type"] == FRAME_DELTA:
194                if ref_abs < self._baseline_count:
195                    ref = f"B{ref_abs}"
196                else:
197                    ref = str(ref_abs - self._baseline_count)
198            else:
199                ref = "self"
200            data_bytes = self._frame_data_size(abs_id)
201            meta = self._read_metadata(abs_id)
202            name = meta.get("name", "")
203            if mixed_sizes:
204                fw, fh = self._frame_size(abs_id)
205                size_str = f"{fw}×{fh}"
206                lines.append(
207                    f"{content_id:>4}  {ftype:>9}  {ref:>4}  "
208                    f"{entry['patch_count']:>7}  {data_bytes:>9,}  {size_str:>11}  {name}"
209                )
210            else:
211                lines.append(
212                    f"{content_id:>4}  {ftype:>9}  {ref:>4}  "
213                    f"{entry['patch_count']:>7}  {data_bytes:>9,}  {name}"
214                )
215        return "\n".join(lines)

Human-readable summary of the collection.

def diff_map(self, frame_id: int) -> list[tuple[int, int, int, int]]:
217    def diff_map(self, frame_id: int) -> list[tuple[int, int, int, int]]:
218        """
219        Return the (x, y, w, h) patch rectangles for a content frame.
220        *frame_id* is a content-visible index.
221        """
222        abs_id = self._content_to_abs(frame_id)
223        entry = self._index[abs_id]
224        if entry["frame_type"] == FRAME_KEYFRAME:
225            w, h = self._frame_size(abs_id)
226            return [(0, 0, w, h)]
227        patches = self._read_patches(abs_id)
228        return [(x, y, w, h) for (x, y, w, h, _) in patches]

Return the (x, y, w, h) patch rectangles for a content frame. frame_id is a content-visible index.

canvas_size: tuple[int, int]
230    @property
231    def canvas_size(self) -> tuple[int, int]:
232        return self._header["width"], self._header["height"]
mode: str
234    @property
235    def mode(self) -> str:
236        return MODE_NAMES.get(self._header["diff_mode"], "unknown")
baseline_count: int
238    @property
239    def baseline_count(self) -> int:
240        """Number of hidden baseline frames (0 for non-manual files)."""
241        return self._baseline_count

Number of hidden baseline frames (0 for non-manual files).

codec: str
371    @property
372    def codec(self) -> str:
373        return CODEC_NAMES.get(self._header.get("codec", 0), "png")
quality: int
375    @property
376    def quality(self) -> int:
377        return self._header.get("quality", 100)
@classmethod
def load_all( cls, path: Union[str, pathlib._local.Path], low_ram_mode: bool = False, cache_size: Optional[int] = None) -> List[PIL.Image.Image]:
388    @classmethod
389    def load_all(
390            cls,
391            path: Union[str, Path],
392            low_ram_mode: bool = False,
393            cache_size: Optional[int] = None,
394    ) -> List[Image.Image]:
395        """Load every content frame and return as a list of PIL Images."""
396        with cls(path, low_ram_mode=low_ram_mode, cache_size=cache_size) as r:
397            return list(r)

Load every content frame and return as a list of PIL Images.

@classmethod
def load_frame( cls, path: Union[str, pathlib._local.Path], frame_id: int, low_ram_mode: bool = False, cache_size: Optional[int] = None) -> PIL.Image.Image:
399    @classmethod
400    def load_frame(
401            cls,
402            path: Union[str, Path],
403            frame_id: int,
404            low_ram_mode: bool = False,
405            cache_size: Optional[int] = None,
406    ) -> Image.Image:
407        """Load a single content frame by content index."""
408        with cls(path, low_ram_mode=low_ram_mode, cache_size=cache_size) as r:
409            return r[frame_id]

Load a single content frame by content index.

def compute_patches( ref_img: PIL.Image.Image, new_img: PIL.Image.Image, threshold: int = 4, tile_size: int = 32, merge_gap: int = 8, codec: str = 'png', quality: int = 100, workers: Optional[int] = None, ref_arr: Optional[numpy.ndarray] = None, new_arr: Optional[numpy.ndarray] = None) -> List[Tuple[int, int, int, int, bytes]]:
 75def compute_patches(
 76    ref_img: Image.Image,
 77    new_img: Image.Image,
 78    threshold: int = 4,
 79    tile_size: int = 32,
 80    merge_gap: int = 8,
 81    codec: str = "png",
 82    quality: int = 100,
 83    workers: Optional[int] = None,
 84    ref_arr: Optional[np.ndarray] = None,
 85    new_arr: Optional[np.ndarray] = None,
 86) -> List[Patch]:
 87    """
 88    Compare *new_img* against *ref_img* using a tile grid.
 89    Parameters
 90    ----------
 91    threshold  : per-channel absolute pixel delta that counts as "changed"
 92                 (use 0 for perfectly lossless; 3-6 absorbs JPEG noise)
 93    tile_size  : grid cell size in pixels (smaller = finer granularity but
 94                 more patch overhead; 16–64 is a good range)
 95    merge_gap  : changed tiles within this many pixels of each other on the
 96                 same row are merged into one patch (reduces patch count)
 97    workers    : thread-pool size for parallel patch compression.
 98                 None = use ThreadPoolExecutor default (cpu_count × 4 or so).
 99                 Set to 1 to disable parallelism entirely.
100
101    """
102    if ref_arr is None:
103        ref = np.array(ref_img.convert("RGBA"), dtype=np.int16)
104    else:
105        ref = ref_arr
106        if ref.dtype != np.int16:
107            ref = ref.astype(np.int16, copy=False)
108
109    if new_arr is None:
110        new = np.array(new_img.convert("RGBA"), dtype=np.int16)
111    else:
112        new = new_arr
113        if new.dtype != np.int16:
114            new = new.astype(np.int16, copy=False)
115
116    if ref.shape != new.shape:
117        raise ValueError(
118            f"Image dimensions differ: ref={ref.shape[:2]} new={new.shape[:2]}"
119        )
120
121    h, w = ref.shape[:2]
122    diff = np.abs(new - ref).max(axis=2)
123
124    cols = (w + tile_size - 1) // tile_size
125    rows = (h + tile_size - 1) // tile_size
126
127    pad_h = rows * tile_size - h
128    pad_w = cols * tile_size - w
129    if pad_h or pad_w:
130        diff_padded = np.pad(diff, ((0, pad_h), (0, pad_w)), constant_values=0)
131    else:
132        diff_padded = diff
133
134    tile_max = (
135        diff_padded
136        .reshape(rows, tile_size, cols, tile_size)
137        .max(axis=(1, 3))
138    )
139    changed_mask = tile_max > threshold
140    tile_rows, tile_cols = np.where(changed_mask)
141    changed_tiles = list(zip(tile_rows.tolist(), tile_cols.tolist()))
142
143    if not changed_tiles:
144        return []
145
146    rects = _tiles_to_rects(changed_tiles, tile_size, w, h)
147    merged = _merge_rects(rects, merge_gap, tile_size, w, h)
148
149    if len(merged) <= 2 or workers == 1:
150        patches: List[Patch] = [
151            (rx, ry, rw, rh, _encode_crop(new_img, rx, ry, rw, rh, codec=codec, quality=quality))
152            for (rx, ry, rw, rh) in merged
153        ]
154    else:
155        def _compress_rect(rect: tuple) -> Patch:
156            rx, ry, rw, rh = rect
157            return rx, ry, rw, rh, _encode_crop(new_img, rx, ry, rw, rh, codec=codec, quality=quality)
158
159        with ThreadPoolExecutor(max_workers=workers) as pool:
160            patches = list(pool.map(_compress_rect, merged))
161
162    return patches

Compare new_img against ref_img using a tile grid.

Parameters
  • threshold (per-channel absolute pixel delta that counts as "changed"): (use 0 for perfectly lossless; 3-6 absorbs JPEG noise)
  • tile_size (grid cell size in pixels (smaller = finer granularity but): more patch overhead; 16–64 is a good range)
  • merge_gap (changed tiles within this many pixels of each other on the): same row are merged into one patch (reduces patch count)
  • workers (thread-pool size for parallel patch compression.): None = use ThreadPoolExecutor default (cpu_count × 4 or so). Set to 1 to disable parallelism entirely.
def reconstruct( base_img: PIL.Image.Image, patches: List[Tuple[int, int, int, int, bytes]], codec: str = 'png') -> PIL.Image.Image:
235def reconstruct(base_img: Image.Image, patches: List[Patch], codec: str = "png") -> Image.Image:
236    """
237    Apply a list of patches onto *base_img* and return a new Image.
238
239    *base_img* is not modified in place.
240    *codec* must match the codec used when the patches were encoded.
241    """
242    result = base_img.copy().convert("RGBA")
243    for (x, y, w, h, compressed) in patches:
244        patch_img = _decode_patch(compressed, codec=codec).convert("RGBA")
245        result.paste(patch_img, (x, y))
246    return result

Apply a list of patches onto base_img and return a new Image.

base_img is not modified in place. codec must match the codec used when the patches were encoded.

def similarity_score(img_a: PIL.Image.Image, img_b: PIL.Image.Image) -> float:
258def similarity_score(img_a: Image.Image, img_b: Image.Image) -> float:
259    """
260    Return fraction of pixels that are identical (0.0 = completely different,
261    1.0 = identical).  Useful for deciding whether to force a keyframe.
262    """
263    a = np.array(img_a.convert("RGBA"), dtype=np.int16)
264    b = np.array(img_b.convert("RGBA"), dtype=np.int16)
265    if a.shape != b.shape:
266        return 0.0
267    diff = np.abs(a - b).max(axis=2)
268    identical_pixels = int((diff == 0).sum())
269    total = a.shape[0] * a.shape[1]
270    return identical_pixels / total

Return fraction of pixels that are identical (0.0 = completely different, 1.0 = identical). Useful for deciding whether to force a keyframe.