Source code for acutils.handler

import numpy as np
import os

from . import file
from . import multiprocess
from . import sheet



[docs] class DataHandler: ''' Main class to handle data on disk. ATTRIBUTES ---------- (str) datapath: Absolute path to the directory that contain source files. (array/list like of str) file_extensions=None: Source file allowed extensions. (int) allowed_cpus=1: Maximum amount of CPUs used to compute. (int) seed=871: Seed used to initialize numpy randomizer. (str) str_ndarray_dtype="U256": Data type used for any string numpy arrays. It defines the maximum length of strings, especially those in sheet files, for loading labels and groups. (numpy.array<str_ndarray_dtype>) files=None: The filenames (with extension) of the files to load. (numpy.array<str_ndarray_dtype>) labels=None: The labels of the files. (numpy.array<str_ndarray_dtype>) unique_labels=None: The unique labels among the labels. (numpy.array<str_ndarray_dtype>) groups=None: The groups of the files, all with the same will be in the same dataset split. ''' def __init__(self, datapath, file_extensions=None, allowed_cpus=1, seed=871, str_ndarray_dtype="U256"): ''' Initiate DataHandler instance to handle data on disk. PARAMETERS ---------- (str) datapath: Absolute path to the directory that contain source files. (array/list like of str) file_extensions=None: Source file allowed extensions. (int) allowed_cpus=1: Maximum amount of CPUs used to compute. (int) seed=871: Seed used to initialize numpy randomizer. (str) str_ndarray_dtype="U256": Data type used for any string numpy arrays. It defines the maximum length of strings, especially those in sheet files, for loading labels and groups. RETURNS ------- None RAISES ------ (NotADirectoryError) err: if the absolute path doesn't lead to an existing directory. ''' if not os.path.isdir(datapath): raise NotADirectoryError("datapath must be an absolute path to an " "existing directory") self.datapath = datapath self.file_extensions = file_extensions self.allowed_cpus = allowed_cpus self.seed = seed self.str_ndarray_dtype = str_ndarray_dtype self.files = None self.labels = None self.unique_labels = None self.groups = None def _format_sheet(self, df, filecol=None, labelcol=None, othercols=None, clueless_words=None): ''' Format a dataframe by deleting rows with empty cell, also add extension to filenames if filename doesn't contain it already, and if there is only one extension. PARAMETERS ---------- (pandas.DataFrame) df: dataframe to format (str) filecol=None: Name of the column that contains filenames, not used there. (str) labelcol=None: Name of the column that contains labels, not used there. (list<str>) othercols=None: Name of the other columns, not used there. (array/list like of str) clueless_words=None: Strings considered as None. RETURNS ------- (pandas.DataFrame) df: formated dataframe. RAISES ------ None ''' return sheet.delete_clueless_rows(df, clueless_words) def _load_sheet(self, sheetpath, filecol, labelcol, othercols=None): ''' Load a sheet file and keep indicated columns. PARAMETERS ---------- (str) sheetpath: Absolute path to the sheet which contain information about data. (str) filecol=None: Name of the column that contains filenames. (str) labelcol=None: Name of the column that contains labels, not used here. (iterable of str) othercols=None: Name of the other columns to keep. RETURNS ------- (pandas.DataFrame) df: Loaded dataframe. RAISES ------ (ValueError) err: If the file extension is not supported. ''' if othercols is None: othercols = [] cols = [col for col in othercols] cols.append(filecol) ; cols.append(labelcol) return sheet.read_df_from_any_avalaible_extensions(sheetpath)[cols]
[docs] def load_data_fromdatapath(self): ''' Load data files from data directory. Assuming that those files are directly inside the data directory. The filenames are stored as "files" attribute. PARAMETERS ---------- None RETURNS ------- None ''' anyfile = (self.file_extensions is None # if no extension, keep any or len(self.file_extensions) == 0) self.files = np.array([filename for filename in os.listdir(self.datapath) if anyfile or filename.endswith(tuple(self.file_extensions))], dtype=self.str_ndarray_dtype)
[docs] def load_labels_fromsheet(self, sheetpath, idcol, labelcol, othercols=None, clueless_words=None, delete_unlabeled_files=True, require_full_filename_match=False): ''' Load data labels from a sheet file. You must load files before calling this, you might call "load_data_fromdatapath". The labels are stored as "labels" attribute and their unique values are stored as "unique_labels" attribute. PARAMETERS ---------- (str) sheetpath: Absolute path to the sheet which contain information about data. (str) idcol=None: Name of the column that contains at least a part of the filename. (str) labelcol=None: Name of the column that contains labels, not used here. (array/list like of str) othercols=None: Name of the other columns to keep. (array/list like of str) clueless_words=None: Strings considered as None. (bool) delete_unlabeled_files=True: If True, delete each file without label. (bool) require_full_filename_match=False: If True, requires the value in the idcol to be exactly the filename, otherwise, if the value in the idcol is included in the filename, it is considered as a match. Note that if the idcol value is included in multiple filenames, it will be associated with the first one found, in descending length order. RETURNS ------- None ''' # Load and format sheet df = self._load_sheet(sheetpath, idcol, labelcol, othercols) df[idcol] = df[idcol].astype(str) df[labelcol] = df[labelcol].astype(str) df = self._format_sheet(df, idcol, labelcol, othercols, clueless_words) # Browse files through the size of those names (descending order) # So if a smaller is included inside a bigger, its exit before disturbing indices = np.argsort(np.array([len(idstr) for idstr in df[idcol].values])) df = df.reindex(index=indices[::-1]) # Get the label of each corresponding file labels = np.empty(self.files.shape, dtype=self.str_ndarray_dtype) for i, filename in enumerate(self.files): for filepart, label in zip(df[idcol].values, df[labelcol].values): if filepart in filename: if require_full_filename_match and filepart != filename: continue labels[i] = label break # Update labels if np.all(labels == ''): print('|WRN| no label keeped, nothing changed. Leaving.') else: if delete_unlabeled_files: ids = np.where(labels != '')[0] self.files = self.files[ids] self.labels = labels[ids] else: self.labels = labels self.unique_labels = np.unique(self.labels ).astype(self.str_ndarray_dtype)
[docs] def load_labeled_data_fromdatapath(self): ''' Load data files and labels from data directory. Assuming that those files are inside subdirectories (named with unique labels). The filenames are stored as "files" attribute. The labels are stored as "labels" attribute and their unique values are stored as "unique_labels" attribute. PARAMETERS ---------- None RETURNS ------- None ''' # Init arrays unique_labels = np.array([label for label in os.listdir(self.datapath) if os.path.isdir(os.path.join(self.datapath, label))], dtype=self.str_ndarray_dtype) labels = np.array([], dtype=self.str_ndarray_dtype) files = np.array([], dtype=self.str_ndarray_dtype) # Fill them label per label with founded files (if the ext is allowed) anyfile = (self.file_extensions is None # if no extension, keep any or len(self.file_extensions) == 0) for i, label in enumerate(unique_labels): incoming_files = np.array( [os.path.join(label, filename) for filename in os.listdir( os.path.join(self.datapath, label)) if anyfile or filename.endswith(tuple(self.file_extensions) )] ) if incoming_files.size == 0: del unique_labels[i] # a label without data is useless continue files = np.concatenate([files, incoming_files]) labels = np.concatenate([labels, np.repeat(np.array([label]), incoming_files.size)] ).astype(self.str_ndarray_dtype) # Update attributes only if not empty if files.size == 0 or labels.size == 0 or unique_labels.size == 0: print('|WRN| no file or label keeped, nothing changed. Leaving.') else: self.unique_labels = unique_labels self.labels = labels self.files = files
[docs] def load_groups_fromsheet(self, sheetpath, idcol, groupcol, clueless_words=None, require_full_filename_match=False): ''' Load data groups from a sheet file. You must load files before calling this, you might call "load_data_fromdatapath". The groups are stored as "groups" attribute. PARAMETERS ---------- (str) sheetpath: Absolute path to the sheet which contain information about data. (str) idcol=None: Name of the column that contains at least a part of the filename. (str) groupcol=None: Name of the column that contains groups, not used here. (array/list like of str) clueless_words=None: Strings considered as None. (bool) require_full_filename_match=False: If True, requires the value in the idcol to be exactly the filename, otherwise, if the value in the idcol is included in the filename, it is considered as a match. Note that if the idcol value is included in multiple filenames, it will be associated with the first one found, in descending length order. RETURNS ------- None ''' # Load and format sheet df = sheet.read_df_from_any_avalaible_extensions(sheetpath) df[idcol] = df[idcol].astype(str) df[groupcol] = df[groupcol].astype(str) # Be sure that '' is considered as empty if clueless_words is None: clueless_words = [''] else: clueless_words = [word for word in clueless_words] # Browse files through the size of those names (descending order) # So if a smaller is included inside a bigger, its exit before disturbing indices = np.argsort(np.array([len(idstr) for idstr in df[idcol].values])) df = df.reindex(index=indices[::-1]) # Get the group of each corresponding file (if avalaible) groups = np.empty(self.files.shape, dtype=self.str_ndarray_dtype) for i, filename in enumerate(self.files): groups[i] = filename # in case no group, filename becomes the group for filepart, group in zip(df[idcol].values, df[groupcol].values): if filepart in filename: if require_full_filename_match and filepart != filename: continue groups[i] = (group if group not in clueless_words else filename) break self.groups = groups
def _balance_dataset(self, data): ''' Balance dataset so the amount of data is equal for each label. PARAMETERS ---------- (dict<str;str>) data: Dictionary with filename as key and label as value. RETURNS ------- (dict<str;str>) balanced_data: Data without superfluous files to balance it. ''' balanced_data = data.copy() # Check how much data should be keeped (amout of labels with less data) files = np.array(list(balanced_data.keys())) labels = np.array(list(balanced_data.values())) lengths = np.array( [np.where(labels == label)[0].size for label in self.unique_labels]) keeped_data = np.min(lengths[np.nonzero(lengths)]) # if no data, error # Balance data for each label for length, label in zip(lengths, self.unique_labels): diff = length - keeped_data # find how much data should be deleted if diff > 0: ids = np.where(labels == label)[0] np.random.seed(self.seed) # so the balance is repeatable np.random.shuffle(ids) # but still random # delete "diff" files from this label for filename in files[ids][:diff]: del balanced_data[filename] return balanced_data
[docs] def balance_datasets(self, tdata, vdata): ''' Balance datasets so the amount of data is equal for each label. This is basically calling "_balance_dataset" method with tdata then vdata. PARAMETERS ---------- (dict<str;str>) tdata: Train dictionary with filename as key and label as value. (dict<str;str>) vdata: Val dictionary with filename as key and label as value. RETURNS ------- (dict<str;str>) balanced_tdata: Train data without superfluous files to balance it. (dict<str;str>) balanced_vdata: Val data without superfluous files to balance it. ''' return self._balance_dataset(tdata), self._balance_dataset(vdata)
def _split_using_groups(self, train_percentage=0.7, balance=False): ''' Split labeled data into train and test datasets considering data groups. PARAMETERS ---------- (float) train_percentage=0.7: Percentage of data expected in train dataset. (bool) balance=False: Do call "balance_datasets" method before returning dictionaries. RETURNS ------- (dict<str;str>) tdata: Train dictionary with filename as key and label as value. (dict<str;str>) vdata: Val dictionary with filename as key and label as value. ''' if (self.files is None or self.labels is None or self.unique_labels is None): print('|WRN| Load labeled data before calling "split". ' '"files", "labels" and "unique_labels" attributes should not ' 'be None. Leaving.') return None if self.groups is None: print('|WRN| Load groups before calling "split".') return None if train_percentage < 0 or train_percentage > 1: print('|WRN| should be: 0.00 <= "train_percentage" <= 1.00. Leaving.') return None # Init lists to store train/val files and labels train_files, train_labels = [], [] val_files, val_labels = [], [] # Fill train/val files/labels lab per lab referring to train_percentage for label in self.unique_labels: # Get groups for data with this label iftl = np.where(self.labels == label)[0] # indices for this label unique_groups, _, counts = np.unique(self.groups[iftl], return_index=True, return_counts=True) # Shuffle to split it randomly np.random.seed(self.seed) # so the split is repeatable browsing_order = np.random.choice(unique_groups.shape[0], size=unique_groups.shape[0], replace = False).astype(np.uint64) # Split data using file amounts (but still considering groups) quantity = 0 train_cap = train_percentage * iftl.shape[0] for i in browsing_order: positions = np.where(self.groups[iftl] == unique_groups[i])[0] for pos in positions: # contains each file index of the same grp if quantity <= train_cap: train_files.append(self.files[iftl][pos]) train_labels.append(self.labels[iftl][pos]) else: # fill val only if train is full val_files.append(self.files[iftl][pos]) val_labels.append(self.labels[iftl][pos]) quantity += counts[i] # Store files and labels inside dictionaries (tdata for train, # vdata for val) tdata = {filename: label for filename, label in zip(train_files, train_labels)} vdata = {filename: label for filename, label in zip(val_files, val_labels)} # Balance datasets (if required) if balance: tdata, vdata = self.balance_datasets(tdata, vdata) return tdata, vdata # @TODO maybe add an optional val_percentage and define a test set
[docs] def split(self, train_percentage=0.7, balance=False, ignore_groups=False): ''' Split labeled data into train and test datasets. PARAMETERS ---------- (float) train_percentage=0.7: Percentage of data expected in train dataset. (bool) balance=False: Do call "balance_datasets" method before returning dictionaries. (bool) ignore_groups=False: If True, ignore groups for the split, even though it is defined. If the "groups" attribute is not define, then it is ignored anyway. If it is defined and "ignore_groups" is False, then the split is done calling "_split_using_groups" method. RETURNS ------- (dict<str;str>) tdata: train dictionary with filename as key and label as value. (dict<str;str>) vdata: val dictionary with filename as key and label as value. ''' if (self.files is None or self.labels is None or self.unique_labels is None): print('|WRN| Load labeled data before calling "split". ' '"files", "labels" and "unique_labels" attributes should ' 'not be None. Leaving.') return None if train_percentage < 0 or train_percentage > 1: print('|WRN| Should be: 0.00 <= "train_percentage" <= 1.00. Leaving.') return None if not ignore_groups and self.groups is not None: return self._split_using_groups(train_percentage, balance) # Init arrays to store train/val files and labels train_files = np.array([], dtype=self.str_ndarray_dtype) train_labels = np.array([], dtype=self.str_ndarray_dtype) val_files = np.array([], dtype=self.str_ndarray_dtype) val_labels = np.array([], dtype=self.str_ndarray_dtype) val_percentage = 1 - train_percentage # Fill train/val files/labels lab per lab referring to train_percentage for label in self.unique_labels: ids = np.where(self.labels == label)[0] np.random.seed(self.seed) # so the split is repeatable np.random.shuffle(ids) # but still random startsat = int(np.ceil(ids.size*val_percentage)) train_files = np.concatenate([train_files, self.files[ids[startsat:]]]) train_labels = np.concatenate([train_labels, self.labels[ids[startsat:]]]) val_files = np.concatenate([val_files, self.files[ids[:startsat]]]) val_labels = np.concatenate([val_labels, self.labels[ids[:startsat]]]) # Store files and labels inside dictionaries (tdata for train, # vdata for val) tdata = {filename: label for filename, label in zip(train_files, train_labels)} vdata = {filename: label for filename, label in zip(val_files, val_labels)} # Balance datasets (if required) if balance: tdata, vdata = self.balance_datasets(tdata, vdata) return tdata, vdata
def _distribute_data(self, dirpath): ''' Distribute files to process and split them between allowed cpus. The distribution is returned as 2 lists of lists of src or dstdir. PARAMETERS ---------- (str) dirpath: Absolute path to the directory for treated files. RETURNS ------- (list<list<str>>) packed_srcs: Source files absolute paths per process. (list<list<str>>) packed_dstdirs: Destination directories absolute paths per process. ''' # Update destination directory with labels (if defined) if self.labels is not None: dstdirs = np.array( [os.path.join(dirpath, label) for label in self.labels]) else: dstdirs = np.array([dirpath for _ in range(self.files.size)]) # Take file absolute paths srcs = np.array( [os.path.join(self.datapath, filename) for filename in self.files]) # Pack src files and directories for multiprocessing packed_srcs, packed_dstdirs = multiprocess.distribute(srcs, dstdirs, self.allowed_cpus, self.seed) return packed_srcs, packed_dstdirs def _distribute_datasets(self, tdstdir, vdstdir, tdata, vdata): ''' Distribute files to process and split them between allowed cpus. The distribution is returned into collections. PARAMETERS ---------- (str) tdstdir: Absolute path to the destination files directory for train dataset. (str) vdstdir: Absolute path to the destination files directory for val dataset. (dict<str;str>) tdata: Train dictionary with filename as key and label as value. (dict<str;str>) vdata: Val dictionary with filename as key and label as value. RETURNS ------- (list<list<str>>) packed_srcs: Src files absolute paths per process. (list<list<str>>) packed_dstdirs: Destination directories absolute paths per process. ''' # Define srcs and dstdirs srcs, dstdirs = [], [] for data, dirpath in zip([tdata, vdata], [tdstdir, vdstdir]): for filename, label in data.items(): srcs.append(os.path.join(self.datapath, filename)) dstdirs.append(os.path.join(dirpath, label)) srcs = np.array(srcs) dstdirs = np.array(dstdirs) # Pack src files and directories for multiprocessing packed_srcs, packed_dstdirs = multiprocess.distribute(srcs, dstdirs, self.allowed_cpus, self.seed) return packed_srcs, packed_dstdirs def _run_processes(self, packed_srcs, packed_dstdirs, func, **kwargs): ''' Run processes on the maximum amount of allowed CPUs to apply "func" function to each source file. "func" needs "src" and "dstdir" params (in acutils, those are prefixed with "tmnt"). **kwargs should be addionnal arguments to pass to the "func" function. PARAMETERS ---------- (array/list like of iterables of str) packed_srcs: src files absolute paths per process. (array/list like of iterables of str) packed_dstdirs: dst dirs absolute paths per process. (function) func: Treatment that will be applied on each source file it needs an absolute path to the source file "src" and absolute path to destination files directory "dstdir". In acutils, any function prefixed with "tmnt" is usable. **kwargs: Arguments to pass to the "func" function. RETURNS ------- None ''' multiprocess.run_processes_on_multiple_files(packed_srcs, packed_dstdirs, func, self.allowed_cpus, **kwargs) def _reset_directory(self, dirpath): ''' Delete directory if it exists, then create it again and fill it with empty subdirecories, named from unique labels (if defined and not empty). PARAMETERS ---------- (str) dirpath: Absolute path to the directory to reset. RETURNS ------- None ''' file.reset_directory(dirpath, subs=self.unique_labels)
[docs] def process(self, dirpath, func=None, empty_dir=True, **kwargs): ''' Run processes on the maximum amount of allowed CPUs to apply "func" function to each source file. If "func" is None, just copy the file. "func" needs "src" and "dstdir" params (in acutils, those are prefixed with "tmnt"). **kwargs should be addionnal arguments to pass to the "func" function. PARAMETERS ---------- (str) dirpath: Absolute path to treated files directory. (function) func=None: Treatment that will be applied on each source file it needs an absolute path to the source file "src" and absolute path to destination files directory "dstdir". In acutils, any function prefixed with "tmnt" is usable. (bool) empty_dir=True: If True, reset destination directory and fill it with unique labels as subdirectories if defined **kwargs: Arguments to pass to the "func" function. RETURNS ------- None ''' # Without treatment to apply, copy source files if func is None: func = file.tmnt_copyfile_to_dir # Reset treated files directory if empty_dir: self._reset_directory(dirpath) # Distribute files between CPUs and run processes packed_srcs, packed_dstdirs = self._distribute_data(dirpath) self._run_processes(packed_srcs, packed_dstdirs, func, **kwargs)
[docs] def make_datasets(self, trainpath, valpath, tdata, vdata, func=None, empty_dir=True, **kwargs): ''' Run processes on the maximum amount of allowed CPUs to apply "func" function to each source file. "func" needs "src" and "dstdir" params (in acutils, those are prefixed with "tmnt"). **kwargs should be addionnal arguments to pass to the "func" function. PARAMETERS ---------- (str) trainpath: Absolute path to the destination files train directory. (str) valpath: Absolute path to the destination files val directory. (dict<str;str>) tdata: Train dictionary with filename as key and label as value. (dict<str;str>) vdata: Val dictionary with filename as key and label as value. (function) func=None: Treatment that will be applied on each source file it needs an absolute path to the source file "src" and absolute path to destination files directory "dstdir". In acutils, any function prefixed with "tmnt" is usable. (bool) empty_dir=True: If True, reset destination directories and fill it with unique labels as subdirectories if defined. **kwargs: Arguments to pass to the "func" function. RETURNS ------- None ''' # Without treatment to apply, copy source files if func is None: func = file.tmnt_copyfile_to_dir # Reset train and val directories if empty_dir: self._reset_directory(trainpath) self._reset_directory(valpath) # Distribute files between CPUs and run processes packed_srcs, packed_dstdirs = self._distribute_datasets(trainpath, valpath, tdata, vdata) self._run_processes(packed_srcs, packed_dstdirs, func, **kwargs)
[docs] def save_split(self, dst, data): ''' Save a split (from "split" method) as a json file. PARAMETERS ---------- (str) dst: absolute path to the new json file. (dict<str;str>) data: data dictionary to save. RETURNS ------- None ''' file.save_dict_as_json(dst, data)
[docs] def load_split(src): ''' Load a dictionary from a json file. PARAMETERS ---------- (str) src: absolute path to the json file. RETURNS ------- (dict<str;str>) data: loaded data dictionary. ''' return file.load_dict_from_json(src)