From 260e2ddbb5013199c1cd760726ac68c295675b09 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Wed, 12 May 2021 10:14:13 +0100 Subject: [PATCH] Changes for s3nc_cfa_mv --- S3netCDF4/_s3netCDF4.pyx | 17 +++- bin/s3nc_cfa_info.py | 16 ++-- bin/s3nc_cfa_mv.py | 199 +++++++++++++++++++++++++++++++++++++++ test/test_split.py | 12 ++- 4 files changed, 227 insertions(+), 17 deletions(-) create mode 100644 bin/s3nc_cfa_mv.py diff --git a/S3netCDF4/_s3netCDF4.pyx b/S3netCDF4/_s3netCDF4.pyx index 09e8b6d..c349649 100644 --- a/S3netCDF4/_s3netCDF4.pyx +++ b/S3netCDF4/_s3netCDF4.pyx @@ -1238,9 +1238,14 @@ class s3Dataset(object): # create the file object, this controls access to the various # file backends that are supported + if mode == 'a': # allow an initial read on append to interpret the + # netCDF file type + open_mode = 'r' + else: + open_mode = mode self._managed_object = self._file_manager.request_file( filename, - mode=mode, + mode=open_mode, lock=True ) @@ -1297,8 +1302,8 @@ class s3Dataset(object): self._file_manager.open_success(filename) # manage the interactions with the data_object self._managed_object.data_object = self._nc_dataset - # handle read-only mode - elif mode == 'r': + # handle read-only / append mode + elif mode == 'r' or mode == 'a': # check the file exists if ( self._managed_object.open_state == OpenFileRecord.DOES_NOT_EXIST @@ -1314,6 +1319,10 @@ class s3Dataset(object): raise IOError("File: {} is not a netCDF file".format(filename)) # read the file in, or create it if self._managed_object.file_object.remote_system: + if mode == 'a': + # append not supported for remote file systems + raise APIException("Mode ''" + mode + "'' not supported " + "for remote storage system.") # stream into memory nc_bytes = self._managed_object.file_object.read() # call the base constructor @@ -1344,7 +1353,7 @@ class s3Dataset(object): parser.read(self, filename) else: # no other modes are supported - raise APIException("Mode " + mode + " not supported.") + raise APIException("Mode ''" + mode + "'' not supported.") def __enter__(self): """Allow with s3Dataset statements.""" diff --git a/bin/s3nc_cfa_info.py b/bin/s3nc_cfa_info.py index 3238006..ccc5627 100755 --- a/bin/s3nc_cfa_info.py +++ b/bin/s3nc_cfa_info.py @@ -211,37 +211,37 @@ def print_dataset_info(input_dataset, group, variable, partition, metadata): ) parser.add_argument( - "-group", action="store", default="all", + "--group", action="store", default="all", metavar="", help=( "Name of a group to print information about, or print all groups. " - "-group=all|" + "--group=all|" ) ) parser.add_argument( - "-variable", action="store", default="all", + "--variable", action="store", default="all", metavar="", help=( "Name of a variable to print information about, print all or no" "variables. " - "-variable=all|none|" + "--variable=all|none|" ) ) parser.add_argument( - "-partition", action = "store", default="none", + "--partition", action = "store", default="none", metavar="", help=( "Print the information about a partition. " - "-partition=all|none|" + "--partition=all|none|" ) ) parser.add_argument( - "-metadata", action = "store_true", default=False, + "--metadata", action = "store_true", default=False, help=( "Print the metadata for groups, dimensions and variables" - "-metadata" + "--metadata" ) ) diff --git a/bin/s3nc_cfa_mv.py b/bin/s3nc_cfa_mv.py new file mode 100644 index 0000000..3a6606b --- /dev/null +++ b/bin/s3nc_cfa_mv.py @@ -0,0 +1,199 @@ +#! /usr/bin/env python + +__copyright__ = "(C) 2019-2021 Science and Technology Facilities Council" +__license__ = "BSD - see LICENSE file in top-level directory" +__authors__ = "Neil Massey" + +"""Program to rewrite partition infomation in a CFA-netCDF master-array file to reflect that a sub-array file has moved. +""" + +import argparse +from urllib.parse import urlparse +import os +import numpy as np +import sys + +from S3netCDF4._s3netCDF4 import s3Dataset as s3Dataset +from S3netCDF4.CFA._CFAClasses import CFAPartition + +def split_file_name(input_name): + # split into prefix and filename + # this should work on urls and file paths + file_split = input_name.split("/") + file_path = "/".join(file_split[:-1]) + file_name = file_split[-1] + return file_path, file_name + +def update_file_in_partition(prefix, cfa_var, partition_index): + """Update the file_information in a variable for a given partition. + Args: + prefix (string): new prefix for files + cfa_var (CFAVariable): variable to alter the partition for + partition_index (np.ndarray): index of the partition to alter + Returns: + None + """ + # get the partition from the index + partition = cfa_var.getPartition(partition_index) + # get the file name and file path: + file_path, file_name = split_file_name(partition.file) + # new file path: + new_file_path = prefix + "/" + file_name + # construct a new partition + new_part = CFAPartition( + index = partition.index, + location = partition.location, + ncvar = partition.ncvar, + file = new_file_path, + format = partition.format, + shape = partition.shape + ) + # write (and replace) the old partition + cfa_var.writePartition(new_part) + +def update_file_in_variable(cfa_var, prefix, partition="all"): + """Update the file_information in a variable for a given partition. + Args: + cfa_var (CFAVariable): CFA variable to alter, containing the partitions + prefix (string): new prefix for files + partition (string): index of the partition to alter, or 'all' + Returns: + None + """ + if partition == "all": + pmshape = cfa_var.getPartitionMatrixShape() + for partition_index in np.ndindex(*pmshape): + update_file_in_partition(prefix, cfa_var, partition_index) + else: + # convert from partition string + partition_index = np.fromstring(args.partition, dtype='i', sep=', ') + update_file_in_partition(prefix, cfa_var, partition_index) + +def update_file_in_group(cfa_group, prefix, variable="all", partition="all"): + """Update the file_information in a group for a given partition. + Args: + cfa_group (CFAGroup): CFA group to alter, containing the cfa_variables + prefix (string): new prefix for files + variable (string): name of the variable to alter, or 'all' + partition (string): index of the partition to alter, or 'all' + Returns: + None + """ + if variable == "all": + for var in cfa_group.getVariables(): + cfa_var = cfa_group.getVariable(var) + update_file_in_variable(cfa_var, prefix, partition) + else: + if variable in cfa_group.getVariables(): + cfa_var = cfa_group.getVariable(variable) + update_file_in_variable(cfa_var, prefix, partition) + + +def update_file_in_partitions(input_dataset, + prefix, + group="all", + variable="all", + partition="all"): + """Update the file information in the given partition. + This partition could be all or a single partition specified by [t,z,x,y] + for example. + + Args: + input_dataset (s3Dataset): dataset to alter + prefix (string): new prefix for files + group (string): name of group to alter, or 'all', or 'none' + variable (string): name of variable to alter, or 'all' + partition (string): name of partition to alter, or 'all' + + Returns: + None + """ + # get the cfa structure from the dataset + cfa_dataset = input_dataset._cfa_dataset + if group == "all": + for grp in cfa_dataset.getGroups(): + cfa_group = cfa_dataset.getGroup(grp) + update_file_in_group(cfa_group, prefix, variable, partition) + else: + # named group + cfa_group = input_dataset.getGroup(group) + update_file_in_group(cfa_group, prefix, variable, partition) + + +if __name__ == "__main__": + """Utility program to alter the structure of a CFA-netCDF master array + file, either on the disk or remotely on S3 storage, to change the + location of the sub-array file. Note that it doesn't actually move any + files, it just updates the record in the partition matrix. + It will only update the prefix of the file location, not the actual + filename. i.e. it replaces os.path.dirname + Options are: + 1. The input master-array file, write back to the same file + 2. The partition to change + --partition=all|none| default: --partition=all + 3. The prefix of the new address for the file location + --prefix= + """ + # set up and parse the arguments + parser = argparse.ArgumentParser( + prog="s3nc_cfa_mv", + formatter_class=argparse.RawTextHelpFormatter, + description=( + "Alter the paths of the sub-array files in the master-array file to" + " reflect that those sub-array files have been moved to a new " + " location. It will only update the prefix of the file location, " " not the actual filename." + ) + ) + + parser.add_argument( + "input", action="store", default="", metavar="", + help=( + "Path of the CFA-netCDF master-array file to alter." + ) + ) + + parser.add_argument( + "--group", action="store", default="all", + metavar="", + help=( + "Name of a group to change file prefix for, or change all groups. " + "--group=all|" + ) + ) + + parser.add_argument( + "--variable", action="store", default="all", + metavar="", + help=( + "Name of a variable to change file prefix for, or change all " "variables." + "--variable=all|" + ) + ) + + parser.add_argument( + "--partition", action = "store", default="all", + metavar="", + help=( + "Choose the partition to change the file location prefix for." + "--partition=all" + ) + ) + + parser.add_argument( + "--prefix", action = "store", default="none", required=True, + metavar="", + help=( + "New file location prefix" + ) + ) + args = parser.parse_args() + + # get the input file + input_path = os.path.expanduser(args.input) + # open the input dataset in append mode + input_dataset = s3Dataset(input_path, mode='a') + # Update the prefix in the partitions + update_file_in_partitions(input_dataset, args.prefix, args.group, + args.variable, args.partition) + # close the file to save the changes + input_dataset.close() diff --git a/test/test_split.py b/test/test_split.py index 4eb633c..a9f2a6f 100644 --- a/test/test_split.py +++ b/test/test_split.py @@ -6,7 +6,7 @@ from S3netCDF4.CFA._CFAClasses import CFADataset from S3netCDF4._s3netCDF4 import s3Dataset -TESTFILE = "/Users/BNL28/Data/nc/ta_Amon_HadCM3_rcp45_r10i1p1_200601-203012.nc" +TESTFILE = "/Users/dhk63261/Archive/cmip5/ta_Amon_HadCM3_rcp45_r10i1p1_203101-203512.nc" def nca_equivalence(ncfile1, ncfile2, variable='ta'): @@ -30,6 +30,8 @@ def nca_equivalence(ncfile1, ncfile2, variable='ta'): # We don't do all data coz it would take a long time assert (xx == yy).all(), "Data in arrays does not match" + x.close() + y.close() # now check file headers raise NotImplementedError("This doesn't mean the test has failed, just the test code is not finished") @@ -40,16 +42,16 @@ class TestSplit(unittest.TestCase): """ All the necessary splitter tests""" def setUp(self): - self.ncafile1 = '/tmp/things1.nca' - self.ncapath = '/tmp/things1/things1.ta.*' - self.ncafile2 = '/tmp/things2.nca' + self.ncafile1 = '/Users/dhk63261/Archive/things1.nca' + self.ncapath = '/Users/dhk63261/Archive/things1/things1.ta.*' + self.ncafile2 = '/Users/dhk63261/Archive/things2.nca' def _split_and_aggregate(self, cfa1, cfa2): # for now use real disk ... input = TESTFILE subarray_size = 50 * 1024 * 1024 subarray_path = "" - subarray_shape = "[2, 17, 73, 96]" + subarray_shape = "[1, 17, 73, 96]" split_into_CFA(self.ncafile1, input, subarray_path,