diff --git a/kart/lfs_util.py b/kart/lfs_util.py index 3d465f1d..17a96022 100644 --- a/kart/lfs_util.py +++ b/kart/lfs_util.py @@ -1,6 +1,5 @@ import base64 import hashlib -import json import logging from pathlib import Path import re diff --git a/kart/tile/tile_dataset.py b/kart/tile/tile_dataset.py index e4259211..ed864a0d 100644 --- a/kart/tile/tile_dataset.py +++ b/kart/tile/tile_dataset.py @@ -11,7 +11,6 @@ from kart.list_of_conflicts import ListOfConflicts, InvalidNewValue from kart.lfs_util import ( get_hash_from_pointer_file, - get_local_path_from_lfs_hash, pointer_file_bytes_to_dict, copy_file_to_local_lfs_cache, merge_pointer_file_dicts, @@ -73,7 +72,7 @@ class TileDataset(BaseDataset): def tile_tree(self): return self.get_subtree(self.TILE_PATH) - def _tile_pointer_blobs_and_dicts( + def tile_pointer_blobs_and_dicts( self, spatial_filter=SpatialFilter.MATCH_ALL, show_progress=False, @@ -137,7 +136,7 @@ def tile_pointer_blobs( """ Returns a generator that yields every tile pointer blob in turn. """ - for blob, _ in self._tile_pointer_blobs_and_dicts( + for blob, _ in self.tile_pointer_blobs_and_dicts( spatial_filter=spatial_filter, show_progress=show_progress, parse_pointer_dicts=False, @@ -158,41 +157,6 @@ def tile_lfs_hashes( ): yield get_hash_from_pointer_file(blob) - def tilenames_with_lfs_hashes( - self, - spatial_filter=SpatialFilter.MATCH_ALL, - fix_extensions=True, - show_progress=False, - ): - """ - Returns a generator that yields every tilename along with its LFS hash. - If fix_extensions is True, then the returned name will be modified to have the correct extension for the - type of tile the blob is pointing to. - """ - for blob, pointer_dict in self._tile_pointer_blobs_and_dicts( - spatial_filter=spatial_filter, show_progress=show_progress - ): - if fix_extensions: - tile_format = pointer_dict.get("format") - oid = pointer_dict["oid"].split(":", maxsplit=1)[1] - yield self.set_tile_extension(blob.name, tile_format=tile_format), oid - else: - yield blob.name, get_hash_from_pointer_file(blob) - - def tilenames_with_lfs_paths( - self, - spatial_filter=SpatialFilter.MATCH_ALL, - fix_extensions=True, - show_progress=False, - ): - """Returns a generator that yields every tilename along with the path where the tile content is stored locally.""" - for blob_name, lfs_hash in self.tilenames_with_lfs_hashes( - spatial_filter=spatial_filter, - fix_extensions=fix_extensions, - show_progress=show_progress, - ): - yield blob_name, get_local_path_from_lfs_hash(self.repo, lfs_hash) - def decode_path(self, path): rel_path = self.ensure_rel_path(path) if rel_path.startswith("tile/"): diff --git a/kart/workdir.py b/kart/workdir.py index 31029281..da0e90b1 100644 --- a/kart/workdir.py +++ b/kart/workdir.py @@ -24,7 +24,7 @@ NO_WORKING_COPY, translate_subprocess_exit_code, ) -from kart.lfs_util import get_local_path_from_lfs_hash +from kart.lfs_util import get_local_path_from_lfs_hash, dict_to_pointer_file_bytes from kart.lfs_commands import fetch_lfs_blobs_for_pointer_files from kart.key_filters import RepoKeyFilter from kart.output_util import InputMode, get_input_mode @@ -234,6 +234,20 @@ def state_session(self): del self._session L.debug("session: new/done") + @contextlib.contextmanager + def workdir_index(self): + """ + Context manager for opening and updating the workdir-index file. + Automatically writes it on close, as long as no exception is thrown. + """ + workdir_index = pygit2.Index(self.index_path) + + try: + yield workdir_index + workdir_index.write() + finally: + del workdir_index + def is_dirty(self): """ Returns True if there are uncommitted changes in the working copy, @@ -380,6 +394,7 @@ def reset( ) # Second pass - actually update the working copy: + if ds_deletes: self.delete_datasets_from_workdir([base_datasets[d] for d in ds_deletes]) if ds_inserts: @@ -484,28 +499,32 @@ def _list_new_pointer_blobs_for_diff(self, dataset_diff, tile_dataset): yield pam_pointer_blob def write_full_datasets_to_workdir(self, datasets, track_changes_as_dirty=False): - dataset_count = len(datasets) - for i, dataset in enumerate(datasets): - assert isinstance(dataset, TileDataset) - - click.echo( - f"Writing tiles for dataset {i+1} of {dataset_count}: {dataset.path}", - err=True, - ) + with self.workdir_index() as workdir_index: + dataset_count = len(datasets) + for i, dataset in enumerate(datasets): + assert isinstance(dataset, TileDataset) + + click.echo( + f"Writing tiles for dataset {i+1} of {dataset_count}: {dataset.path}", + err=True, + ) - wc_tiles_dir = self.path / dataset.path - (wc_tiles_dir).mkdir(parents=True, exist_ok=True) + wc_tiles_dir = self.path / dataset.path + (wc_tiles_dir).mkdir(parents=True, exist_ok=True) - for tilename, lfs_path in dataset.tilenames_with_lfs_paths( - self.repo.spatial_filter, - show_progress=True, - ): - if not lfs_path.is_file(): - click.echo( - f"Couldn't find tile {tilename} locally - skipping...", err=True + for pointer_blob, pointer_dict in dataset.tile_pointer_blobs_and_dicts( + self.repo.spatial_filter, + show_progress=True, + ): + pointer_dict["name"] = dataset.set_tile_extension( + pointer_blob.name, tile_format=pointer_dict.get("format") + ) + self._write_tile_or_pam_file_to_workdir( + dataset, + pointer_dict, + workdir_index, + write_to_index=not track_changes_as_dirty, ) - continue - try_reflink(lfs_path, wc_tiles_dir / tilename) self.write_mosaic_for_dataset(dataset) @@ -594,7 +613,12 @@ def delete_tiles_for_dataset( self._reset_workdir_index_for_files(reset_index_files) def _update_dataset_in_workdir( - self, dataset, diff_to_apply, ds_filter, track_changes_as_dirty + self, + dataset, + diff_to_apply, + *, + ds_filter, + track_changes_as_dirty, ): ds_path = dataset.path ds_tiles_dir = (self.path / ds_path).resolve() @@ -610,52 +634,45 @@ def _update_dataset_in_workdir( if not tile_diff: return - for tile_delta in tile_diff.values(): - if tile_delta.type in ("update", "delete"): - old_val = tile_delta.old_value - tile_name = old_val.get("sourceName") or old_val.get("name") - tile_path = ds_tiles_dir / tile_name - if tile_path.is_file(): - tile_path.unlink() - if not do_update_all: - reset_index_files.append(f"{ds_path}/{tile_name}") - pam_name = tile_delta.old_value.get("pamName") - if pam_name: - pam_path = ds_tiles_dir / pam_name - if pam_path.is_file(): - pam_path.unlink() - - if tile_delta.type in ("update", "insert"): - new_val = tile_delta.new_value - tile_name = new_val.get("sourceName") or new_val.get("name") - lfs_path = get_local_path_from_lfs_hash( - self.repo, tile_delta.new_value["oid"] - ) - if not lfs_path.is_file(): - click.echo( - f"Couldn't find tile {tile_name} locally - skipping...", - err=True, - ) - continue - try_reflink(lfs_path, ds_tiles_dir / tile_name) - if not do_update_all: - reset_index_files.append(f"{ds_path}/{tile_name}") - - pam_name = tile_delta.new_value.get("pamName") - if pam_name: - pam_path = ds_tiles_dir / pam_name - lfs_path = get_local_path_from_lfs_hash( - self.repo, tile_delta.new_value["pamOid"] + with self.workdir_index() as workdir_index: + for tile_delta in tile_diff.values(): + if tile_delta.type in ("update", "delete"): + old_val = tile_delta.old_value + tile_name = old_val.get("sourceName") or old_val.get("name") + tile_path = ds_tiles_dir / tile_name + if tile_path.is_file(): + tile_path.unlink() + if not do_update_all: + reset_index_files.append(f"{ds_path}/{tile_name}") + pam_name = old_val.get("sourcePamName") or old_val.get("pamName") + if pam_name: + pam_path = ds_tiles_dir / pam_name + if pam_path.is_file(): + pam_path.unlink() + + if tile_delta.type in ("update", "insert"): + new_val = tile_delta.new_value + tile_name = new_val.get("sourceName") or new_val.get("name") + self._write_tile_or_pam_file_to_workdir( + dataset, + new_val, + workdir_index, + write_to_index=not track_changes_as_dirty, ) - if not lfs_path.is_file(): - click.echo( - f"Couldn't find PAM file {pam_name} locally - skipping...", - err=True, - ) - continue - try_reflink(lfs_path, ds_tiles_dir / pam_name) if not do_update_all: - reset_index_files.append(f"{ds_path}/{pam_name}") + reset_index_files.append(f"{ds_path}/{tile_name}") + + pam_name = new_val.get("sourcePamName") or new_val.get("pamName") + if pam_name: + self._write_tile_or_pam_file_to_workdir( + dataset, + new_val, + workdir_index, + use_pam_prefix=True, + write_to_index=not track_changes_as_dirty, + ) + if not do_update_all: + reset_index_files.append(f"{ds_path}/{pam_name}") self.write_mosaic_for_dataset(dataset) @@ -749,7 +766,7 @@ def _reset_workdir_index_for_files(self, file_paths): sys.exit(translate_subprocess_exit_code(e.returncode)) def _hard_reset_after_commit_for_converted_and_renamed_tiles( - self, datasets, committed_diff + self, datasets, committed_diff, workdir_index ): """ Look for tiles that were automatically modified as part of the commit operation @@ -767,13 +784,15 @@ def _hard_reset_after_commit_for_converted_and_renamed_tiles( if new_value is None: continue if "sourceName" in new_value or "sourceFormat" in new_value: - self._hard_reset_converted_tile(dataset, tile_delta) + self._hard_reset_converted_tile(dataset, tile_delta, workdir_index) if "pamSourceName" in new_value: - self._hard_reset_renamed_pam_file(dataset, tile_delta) + self._hard_reset_renamed_pam_file( + dataset, tile_delta, workdir_index + ) self.write_mosaic_for_dataset(dataset) - def _hard_reset_converted_tile(self, dataset, tile_delta): + def _hard_reset_converted_tile(self, dataset, tile_delta, workdir_index): """ Update an individual tile in the workdir so that it reflects what was actually committed. """ @@ -790,14 +809,11 @@ def _hard_reset_converted_tile(self, dataset, tile_delta): if name_pattern.fullmatch(child.name) and child.is_file(): child.unlink() - tilename = tile_delta.new_value["name"] - lfs_path = get_local_path_from_lfs_hash(self.repo, tile_delta.new_value["oid"]) - if not lfs_path.is_file(): - click.echo(f"Couldn't find tile {tilename} locally - skipping...", err=True) - else: - try_reflink(lfs_path, ds_tiles_dir / tilename) + self._write_tile_or_pam_file_to_workdir( + dataset, tile_delta.new_value, workdir_index, write_to_index=True + ) - def _hard_reset_renamed_pam_file(self, dataset, tile_delta): + def _hard_reset_renamed_pam_file(self, dataset, tile_delta, workdir_index): """ Update an individual PAM file in the workdir so that it reflects what was actually committed. """ @@ -817,16 +833,61 @@ def _hard_reset_renamed_pam_file(self, dataset, tile_delta): if pam_name_pattern.fullmatch(child.name) and child.is_file(): child.unlink() - pam_name = tile_delta.new_value["pamName"] - lfs_path = get_local_path_from_lfs_hash( - self.repo, tile_delta.new_value["pamOid"] + self._write_tile_or_pam_file_to_workdir( + dataset, + tile_delta.new_value, + workdir_index, + use_pam_prefix=True, + write_to_index=True, ) + + def _write_tile_or_pam_file_to_workdir( + self, + dataset, + tile_summary, + workdir_index, + *, + use_pam_prefix=False, + write_to_index=True, + ): + tilename = tile_summary["pamName" if use_pam_prefix else "name"] + oid = tile_summary["pamOid" if use_pam_prefix else "oid"] + size = tile_summary["pamSize" if use_pam_prefix else "size"] + lfs_path = get_local_path_from_lfs_hash(self.repo, oid) if not lfs_path.is_file(): - click.echo( - f"Couldn't find PAM file {pam_name} locally - skipping...", err=True + click.echo(f"Couldn't find {tilename} locally - skipping...", err=True) + return + + workdir_path = (self.path / dataset.path / tilename).resolve() + # Sanity check to make sure we're not messing with files we shouldn't: + assert self.path in workdir_path.parents + assert self.repo.workdir_path in workdir_path.parents + assert workdir_path.parents[0].is_dir() + + try_reflink(lfs_path, workdir_path) + + if write_to_index: + # In general, after writing a dataset, we ask Git to build an index of the workdir, + # which we can then use (via some Git commands) to detect changes to the workdir. + # Git builds an index by getting the hash and the stat info for each file it finds. + # However, the LFS tiles are potentially large and numerous, costly to hash - and + # we already know their hashes. So, in this step, we pre-emptively update the index + # just for the LFS tiles. When Git builds the index, it will find the entries for + # those tiles already written, and detect they are unchanged by checking the stat info, + # so it won't need to update those entries. + + # Git would do something similarly efficient if we used Git to do the checkout + # operation in the first place. However, Git would need some changes to be + # able to do this well - understanding Kart datasets, rewriting paths, and + # using reflink where available. + + rel_path = f"{dataset.path}/{tilename}" + pointer_file = dict_to_pointer_file_bytes({"oid": oid, "size": size}) + pointer_file_oid = self.repo.write(pygit2.GIT_OBJ_BLOB, pointer_file) + workdir_index.add_entry_with_custom_stat( + pygit2.IndexEntry(rel_path, pointer_file_oid, pygit2.GIT_FILEMODE_BLOB), + workdir_path, ) - else: - try_reflink(lfs_path, ds_tiles_dir / pam_name) def soft_reset_after_commit( self, @@ -844,9 +905,10 @@ def soft_reset_after_commit( # Handle tiles that were, eg, converted to COPC during the commit - the non-COPC # tiles in the workdir now need to be replaced with the COPC ones: - self._hard_reset_after_commit_for_converted_and_renamed_tiles( - datasets, committed_diff - ) + with self.workdir_index() as workdir_index: + self._hard_reset_after_commit_for_converted_and_renamed_tiles( + datasets, committed_diff, workdir_index + ) self._reset_workdir_index_for_datasets(datasets, repo_key_filter=mark_as_clean) self.delete_tiles(