Skip to content

Commit

Permalink
Use Index.add_entry_with_custom_stat...
Browse files Browse the repository at this point in the history
to greatly increase efficiency during LFS working copy checkout.
Without this change, the step of building the index during
the working copy checkout will take a long time as it hashes
all the LFS tiles that have been checked out.
  • Loading branch information
olsen232 committed Jul 21, 2023
1 parent 945c91f commit 5b7e736
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 125 deletions.
1 change: 0 additions & 1 deletion kart/lfs_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
import hashlib
import json
import logging
from pathlib import Path
import re
Expand Down
40 changes: 2 additions & 38 deletions kart/tile/tile_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from kart.list_of_conflicts import ListOfConflicts, InvalidNewValue
from kart.lfs_util import (
get_hash_from_pointer_file,
get_local_path_from_lfs_hash,
pointer_file_bytes_to_dict,
copy_file_to_local_lfs_cache,
merge_pointer_file_dicts,
Expand Down Expand Up @@ -73,7 +72,7 @@ class TileDataset(BaseDataset):
def tile_tree(self):
return self.get_subtree(self.TILE_PATH)

def _tile_pointer_blobs_and_dicts(
def tile_pointer_blobs_and_dicts(
self,
spatial_filter=SpatialFilter.MATCH_ALL,
show_progress=False,
Expand Down Expand Up @@ -137,7 +136,7 @@ def tile_pointer_blobs(
"""
Returns a generator that yields every tile pointer blob in turn.
"""
for blob, _ in self._tile_pointer_blobs_and_dicts(
for blob, _ in self.tile_pointer_blobs_and_dicts(
spatial_filter=spatial_filter,
show_progress=show_progress,
parse_pointer_dicts=False,
Expand All @@ -158,41 +157,6 @@ def tile_lfs_hashes(
):
yield get_hash_from_pointer_file(blob)

def tilenames_with_lfs_hashes(
self,
spatial_filter=SpatialFilter.MATCH_ALL,
fix_extensions=True,
show_progress=False,
):
"""
Returns a generator that yields every tilename along with its LFS hash.
If fix_extensions is True, then the returned name will be modified to have the correct extension for the
type of tile the blob is pointing to.
"""
for blob, pointer_dict in self._tile_pointer_blobs_and_dicts(
spatial_filter=spatial_filter, show_progress=show_progress
):
if fix_extensions:
tile_format = pointer_dict.get("format")
oid = pointer_dict["oid"].split(":", maxsplit=1)[1]
yield self.set_tile_extension(blob.name, tile_format=tile_format), oid
else:
yield blob.name, get_hash_from_pointer_file(blob)

def tilenames_with_lfs_paths(
self,
spatial_filter=SpatialFilter.MATCH_ALL,
fix_extensions=True,
show_progress=False,
):
"""Returns a generator that yields every tilename along with the path where the tile content is stored locally."""
for blob_name, lfs_hash in self.tilenames_with_lfs_hashes(
spatial_filter=spatial_filter,
fix_extensions=fix_extensions,
show_progress=show_progress,
):
yield blob_name, get_local_path_from_lfs_hash(self.repo, lfs_hash)

def decode_path(self, path):
rel_path = self.ensure_rel_path(path)
if rel_path.startswith("tile/"):
Expand Down
234 changes: 148 additions & 86 deletions kart/workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
NO_WORKING_COPY,
translate_subprocess_exit_code,
)
from kart.lfs_util import get_local_path_from_lfs_hash
from kart.lfs_util import get_local_path_from_lfs_hash, dict_to_pointer_file_bytes
from kart.lfs_commands import fetch_lfs_blobs_for_pointer_files
from kart.key_filters import RepoKeyFilter
from kart.output_util import InputMode, get_input_mode
Expand Down Expand Up @@ -234,6 +234,20 @@ def state_session(self):
del self._session
L.debug("session: new/done")

@contextlib.contextmanager
def workdir_index(self):
"""
Context manager for opening and updating the workdir-index file.
Automatically writes it on close, as long as no exception is thrown.
"""
workdir_index = pygit2.Index(self.index_path)

try:
yield workdir_index
workdir_index.write()
finally:
del workdir_index

def is_dirty(self):
"""
Returns True if there are uncommitted changes in the working copy,
Expand Down Expand Up @@ -380,6 +394,7 @@ def reset(
)

# Second pass - actually update the working copy:

if ds_deletes:
self.delete_datasets_from_workdir([base_datasets[d] for d in ds_deletes])
if ds_inserts:
Expand Down Expand Up @@ -484,28 +499,32 @@ def _list_new_pointer_blobs_for_diff(self, dataset_diff, tile_dataset):
yield pam_pointer_blob

def write_full_datasets_to_workdir(self, datasets, track_changes_as_dirty=False):
dataset_count = len(datasets)
for i, dataset in enumerate(datasets):
assert isinstance(dataset, TileDataset)

click.echo(
f"Writing tiles for dataset {i+1} of {dataset_count}: {dataset.path}",
err=True,
)
with self.workdir_index() as workdir_index:
dataset_count = len(datasets)
for i, dataset in enumerate(datasets):
assert isinstance(dataset, TileDataset)

click.echo(
f"Writing tiles for dataset {i+1} of {dataset_count}: {dataset.path}",
err=True,
)

wc_tiles_dir = self.path / dataset.path
(wc_tiles_dir).mkdir(parents=True, exist_ok=True)
wc_tiles_dir = self.path / dataset.path
(wc_tiles_dir).mkdir(parents=True, exist_ok=True)

for tilename, lfs_path in dataset.tilenames_with_lfs_paths(
self.repo.spatial_filter,
show_progress=True,
):
if not lfs_path.is_file():
click.echo(
f"Couldn't find tile {tilename} locally - skipping...", err=True
for pointer_blob, pointer_dict in dataset.tile_pointer_blobs_and_dicts(
self.repo.spatial_filter,
show_progress=True,
):
pointer_dict["name"] = dataset.set_tile_extension(
pointer_blob.name, tile_format=pointer_dict.get("format")
)
self._write_tile_or_pam_file_to_workdir(
dataset,
pointer_dict,
workdir_index,
write_to_index=not track_changes_as_dirty,
)
continue
try_reflink(lfs_path, wc_tiles_dir / tilename)

self.write_mosaic_for_dataset(dataset)

Expand Down Expand Up @@ -594,7 +613,12 @@ def delete_tiles_for_dataset(
self._reset_workdir_index_for_files(reset_index_files)

def _update_dataset_in_workdir(
self, dataset, diff_to_apply, ds_filter, track_changes_as_dirty
self,
dataset,
diff_to_apply,
*,
ds_filter,
track_changes_as_dirty,
):
ds_path = dataset.path
ds_tiles_dir = (self.path / ds_path).resolve()
Expand All @@ -610,52 +634,45 @@ def _update_dataset_in_workdir(
if not tile_diff:
return

for tile_delta in tile_diff.values():
if tile_delta.type in ("update", "delete"):
old_val = tile_delta.old_value
tile_name = old_val.get("sourceName") or old_val.get("name")
tile_path = ds_tiles_dir / tile_name
if tile_path.is_file():
tile_path.unlink()
if not do_update_all:
reset_index_files.append(f"{ds_path}/{tile_name}")
pam_name = tile_delta.old_value.get("pamName")
if pam_name:
pam_path = ds_tiles_dir / pam_name
if pam_path.is_file():
pam_path.unlink()

if tile_delta.type in ("update", "insert"):
new_val = tile_delta.new_value
tile_name = new_val.get("sourceName") or new_val.get("name")
lfs_path = get_local_path_from_lfs_hash(
self.repo, tile_delta.new_value["oid"]
)
if not lfs_path.is_file():
click.echo(
f"Couldn't find tile {tile_name} locally - skipping...",
err=True,
)
continue
try_reflink(lfs_path, ds_tiles_dir / tile_name)
if not do_update_all:
reset_index_files.append(f"{ds_path}/{tile_name}")

pam_name = tile_delta.new_value.get("pamName")
if pam_name:
pam_path = ds_tiles_dir / pam_name
lfs_path = get_local_path_from_lfs_hash(
self.repo, tile_delta.new_value["pamOid"]
with self.workdir_index() as workdir_index:
for tile_delta in tile_diff.values():
if tile_delta.type in ("update", "delete"):
old_val = tile_delta.old_value
tile_name = old_val.get("sourceName") or old_val.get("name")
tile_path = ds_tiles_dir / tile_name
if tile_path.is_file():
tile_path.unlink()
if not do_update_all:
reset_index_files.append(f"{ds_path}/{tile_name}")
pam_name = old_val.get("sourcePamName") or old_val.get("pamName")
if pam_name:
pam_path = ds_tiles_dir / pam_name
if pam_path.is_file():
pam_path.unlink()

if tile_delta.type in ("update", "insert"):
new_val = tile_delta.new_value
tile_name = new_val.get("sourceName") or new_val.get("name")
self._write_tile_or_pam_file_to_workdir(
dataset,
new_val,
workdir_index,
write_to_index=not track_changes_as_dirty,
)
if not lfs_path.is_file():
click.echo(
f"Couldn't find PAM file {pam_name} locally - skipping...",
err=True,
)
continue
try_reflink(lfs_path, ds_tiles_dir / pam_name)
if not do_update_all:
reset_index_files.append(f"{ds_path}/{pam_name}")
reset_index_files.append(f"{ds_path}/{tile_name}")

pam_name = new_val.get("sourcePamName") or new_val.get("pamName")
if pam_name:
self._write_tile_or_pam_file_to_workdir(
dataset,
new_val,
workdir_index,
use_pam_prefix=True,
write_to_index=not track_changes_as_dirty,
)
if not do_update_all:
reset_index_files.append(f"{ds_path}/{pam_name}")

self.write_mosaic_for_dataset(dataset)

Expand Down Expand Up @@ -749,7 +766,7 @@ def _reset_workdir_index_for_files(self, file_paths):
sys.exit(translate_subprocess_exit_code(e.returncode))

def _hard_reset_after_commit_for_converted_and_renamed_tiles(
self, datasets, committed_diff
self, datasets, committed_diff, workdir_index
):
"""
Look for tiles that were automatically modified as part of the commit operation
Expand All @@ -767,13 +784,15 @@ def _hard_reset_after_commit_for_converted_and_renamed_tiles(
if new_value is None:
continue
if "sourceName" in new_value or "sourceFormat" in new_value:
self._hard_reset_converted_tile(dataset, tile_delta)
self._hard_reset_converted_tile(dataset, tile_delta, workdir_index)
if "pamSourceName" in new_value:
self._hard_reset_renamed_pam_file(dataset, tile_delta)
self._hard_reset_renamed_pam_file(
dataset, tile_delta, workdir_index
)

self.write_mosaic_for_dataset(dataset)

def _hard_reset_converted_tile(self, dataset, tile_delta):
def _hard_reset_converted_tile(self, dataset, tile_delta, workdir_index):
"""
Update an individual tile in the workdir so that it reflects what was actually committed.
"""
Expand All @@ -790,14 +809,11 @@ def _hard_reset_converted_tile(self, dataset, tile_delta):
if name_pattern.fullmatch(child.name) and child.is_file():
child.unlink()

tilename = tile_delta.new_value["name"]
lfs_path = get_local_path_from_lfs_hash(self.repo, tile_delta.new_value["oid"])
if not lfs_path.is_file():
click.echo(f"Couldn't find tile {tilename} locally - skipping...", err=True)
else:
try_reflink(lfs_path, ds_tiles_dir / tilename)
self._write_tile_or_pam_file_to_workdir(
dataset, tile_delta.new_value, workdir_index, write_to_index=True
)

def _hard_reset_renamed_pam_file(self, dataset, tile_delta):
def _hard_reset_renamed_pam_file(self, dataset, tile_delta, workdir_index):
"""
Update an individual PAM file in the workdir so that it reflects what was actually committed.
"""
Expand All @@ -817,16 +833,61 @@ def _hard_reset_renamed_pam_file(self, dataset, tile_delta):
if pam_name_pattern.fullmatch(child.name) and child.is_file():
child.unlink()

pam_name = tile_delta.new_value["pamName"]
lfs_path = get_local_path_from_lfs_hash(
self.repo, tile_delta.new_value["pamOid"]
self._write_tile_or_pam_file_to_workdir(
dataset,
tile_delta.new_value,
workdir_index,
use_pam_prefix=True,
write_to_index=True,
)

def _write_tile_or_pam_file_to_workdir(
self,
dataset,
tile_summary,
workdir_index,
*,
use_pam_prefix=False,
write_to_index=True,
):
tilename = tile_summary["pamName" if use_pam_prefix else "name"]
oid = tile_summary["pamOid" if use_pam_prefix else "oid"]
size = tile_summary["pamSize" if use_pam_prefix else "size"]
lfs_path = get_local_path_from_lfs_hash(self.repo, oid)
if not lfs_path.is_file():
click.echo(
f"Couldn't find PAM file {pam_name} locally - skipping...", err=True
click.echo(f"Couldn't find {tilename} locally - skipping...", err=True)
return

workdir_path = (self.path / dataset.path / tilename).resolve()
# Sanity check to make sure we're not messing with files we shouldn't:
assert self.path in workdir_path.parents
assert self.repo.workdir_path in workdir_path.parents
assert workdir_path.parents[0].is_dir()

try_reflink(lfs_path, workdir_path)

if write_to_index:
# In general, after writing a dataset, we ask Git to build an index of the workdir,
# which we can then use (via some Git commands) to detect changes to the workdir.
# Git builds an index by getting the hash and the stat info for each file it finds.
# However, the LFS tiles are potentially large and numerous, costly to hash - and
# we already know their hashes. So, in this step, we pre-emptively update the index
# just for the LFS tiles. When Git builds the index, it will find the entries for
# those tiles already written, and detect they are unchanged by checking the stat info,
# so it won't need to update those entries.

# Git would do something similarly efficient if we used Git to do the checkout
# operation in the first place. However, Git would need some changes to be
# able to do this well - understanding Kart datasets, rewriting paths, and
# using reflink where available.

rel_path = f"{dataset.path}/{tilename}"
pointer_file = dict_to_pointer_file_bytes({"oid": oid, "size": size})
pointer_file_oid = self.repo.write(pygit2.GIT_OBJ_BLOB, pointer_file)
workdir_index.add_entry_with_custom_stat(
pygit2.IndexEntry(rel_path, pointer_file_oid, pygit2.GIT_FILEMODE_BLOB),
workdir_path,
)
else:
try_reflink(lfs_path, ds_tiles_dir / pam_name)

def soft_reset_after_commit(
self,
Expand All @@ -844,9 +905,10 @@ def soft_reset_after_commit(

# Handle tiles that were, eg, converted to COPC during the commit - the non-COPC
# tiles in the workdir now need to be replaced with the COPC ones:
self._hard_reset_after_commit_for_converted_and_renamed_tiles(
datasets, committed_diff
)
with self.workdir_index() as workdir_index:
self._hard_reset_after_commit_for_converted_and_renamed_tiles(
datasets, committed_diff, workdir_index
)

self._reset_workdir_index_for_datasets(datasets, repo_key_filter=mark_as_clean)
self.delete_tiles(
Expand Down

0 comments on commit 5b7e736

Please sign in to comment.