Source code for cal_proc.generic

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
r"""
Generic instrument class.
"""


import datetime, pytz
import numpy as np
import os.path
import netCDF4

import pdb


[docs]def walk_dstree(ds): """Recursive Dataset group generator. from: http://unidata.github.io/netcdf4-python/netCDF4/index.html#section2 Args: ds (:obj:`netCDF4.Dataset`): Dataset """ values = ds.groups.values() yield values for value in ds.groups.values(): for children in walk_dstree(value): yield children
[docs]def append_time(otime,ntime,concat_axis=0): """Appends time variable/s to existing time coordinate. Note that increasing the size of the ``time`` coordinate also increases the size of all of the dependent variables. Args: otime (:obj:`netCDF4.Variable`): Original Dataset time coordinate. As is coordinate dimensionality is 1d. ntime (:obj:`netCDF4.Variable` or `iterable`): New time variables to append to `otime`. May either be a netCDF4 variable or a simple iterable of values. These values may be datetime objects. If the values are strings some attempt to convert them will be done. If they are numbers then it is assumed that units and calendar are compatible with those in `otime`. Returns: A netCDF4 variable with the same units and calendar as `otime`. """ from dateutil import parser # Convert original times into datetime objects try: ocalendar = otime.calendar except AttributeError: ocalendar = 'standard' try: odatetime = netCDF4.num2date(otime[:],otime.units,ocalendar) except IndexError: # num2date does not work on empty arrays odatetime = np.array([]) if type(ntime) == netCDF4.Variable: # Convert new times into datetime objects try: ncalendar = ntime.calendar except AttributeError: ncalendar = 'standard' try: ndatetime = netCDF4.num2date(ntime[:],ntime.units,ncalendar) except IndexError: # num2date does not work on empty arrays ndatetime = np.array([]) else: if isinstance(ntime,(int,float,str)): ntime = np.array([ntime]) if (type(ntime[0]) == str) or (ntime.dtype.kind in ['U','S']): # Attempt to convert the times into a list of datetime object # Assume that all formats are the same! ndatetime = [parser.parse(n_, dayfirst=True) for n_ in ntime] else: # Assume are numbers in same format as those in otime ndatetime = netCDF4.num2date(ntime,otime.units,ocalendar) adatetime = np.ma.concatenate((odatetime,ndatetime),axis=concat_axis) return netCDF4.date2num(adatetime,otime.units,ocalendar)
[docs]class Generic(): """Parent class for general instrument parsing and processing. Generic forms the basis for all specific instrument processor classes. """ def __init__(self,ds): """ Args: ds (:obj:`netCDF4.Dataset`): Dataset from ingested cal_nc file .. note:: I don't think that this is actually true anymore... Note that cal_nc file as been read using r+. Thus variables (?) and attributes cannot be appended to. Values must be read to a python variable, the nc key deleted, then rewritten with appropriate modfications. """ self.ds = ds def __str__(self): """ Help, specifically with regards to structure of update() method if it exists. If update() does not exist then use docstr of processor class __init__() .. todo:: This needs to be updated """ # To print name of instance use: type(self).__name__ import pdb h1 = self.__doc__ try: h2 = h1 + '\n' + self.update.__doc__ except AttributeError: pdb.set_trace() h2 = h1 try: h3 = h2 + '\n' + self._add__str__() except AttributeError: pdb.set_trace() h3 = h2 return h3
[docs] def update_ver(self): """Includes program version information as root attribute. Version information is determined from ``cal_proc.__init__()``. Any existing version strings shall be overwritten. """ from cal_proc import __version__ self.software_version = __version__ self.ds.software_version = __version__
[docs] def update_hist(self,update=None): """Updates the global history attribute. The history nc attribute is a single string of comma-delineated text. Args: update (:obj:`str` or :obj:`list`): Update for history string. If None (default) then auto-generate string based on today's datetime. If given then append update/s to history attribute string. Any ``<now>`` or ``<today>`` strings are changed to today's datetime. """ # With datetime v3.6 can use timespec='seconds' to drop ms # Timezone aware timestamp is generated by default. t_ = datetime.datetime.now(pytz.utc).replace(microsecond=0).strftime('%Y%m%dT%H%M') if update is None: update = '{} Auto update'.format(t_) elif update is 'NA': # This assumes that all updates have been handled in the cdl # file. So nothing needs to be done here. update = '' elif hasattr(update,'__iter__') and type(update) not in [str]: # If is a list of strings then join update = ', '.join(update[:]) # Change any shortcuts to today's date update = update.replace('<today>',t_).replace('<now>',t_) try: hist_ = self.ds.history except AttributeError as err: # username attribute does not exist so create it self.ds.history = update else: # username attribute already exists to append to end of string del(self.ds.history) if hist_ in ['',None]: self.ds.history = update else: self.ds.history = '{}, {}'.format(hist_,update)
[docs] def update_user(self,update=None): """Updates the global username attribute. The username nc attribute is a single string of comma-delineated text. Args: update (:obj:`str` or :obj:`list`): Update for username string. If None (default) then auto-generate string based on previous entries in netCDF and ask user. String usually given as `username <user@email>`. Append username/s to existing attribute string. """ # Extract existing username from ds try: user_ = self.ds.username except AttributeError as err: # username attribute does not exist so create it user_ = None last_user_ = None else: last_user_ = user_.split(',')[-1].strip() if update in ['',None]: # No username given so use last entry from nc if possible but # confirm with user if last_user_ in ['',None]: while update in ['',None]: update = input("\nEnter 'username <email>': ").strip() else: update = input("\nEnter 'username <email>' [enter for {}]: ".format(last_user_)).strip() if update == '': update = last_user_ # elif update is 'NA': # # This assumes that all updates have been handed in the cdl # # file. So nothing needs to be done here # update = '' elif hasattr(update,'__iter__') and type(update) not in [str]: # If is a list of strings then join update = ', '.join(update[:]) if user_ in ['',None]: self.ds.username = update else: # username attribute already exists to append to end of string del(self.ds.username) self.ds.username = '{}, {}'.format(user_,update)
[docs] def update_attr(self,attr,update=None): """Updates an attribute by appending update. Root and group attributes are generally strings and should not be changed. However they may be appended to, it is common to create a comma-delineated string. If attr does not exist then it is not created by this method. If a new attribute is required then it is more sound to create a new nc file from scratch that includes this attribute. Args: attr (:obj:`str`): Name of attribute to update. If the attribute is in a group instead of the root then the full path of the attribute must be included with / seperators. If attr does not exist within the dataset then do not create but return. update (:obj:`str` or :obj:`list`): Update for attribute. If None (default) then just return. If string then append to existing attr string with comma seperator. If list of strings then append comma-delineated string generated from list. """ if update in [None,'']: # No updates to be made return # Extract existing attribute from ds grp_, attr_ = os.path.split(attr) try: attr_old = self.ds.getncattr(attr) except AttributeError as err: try: attr_old = self.ds[grp_].getncattr(attr_) except AttributeError as err: # Attribute as given does not exist so do nothing print('Attribute {} does not exist.'.format(attr)) return else: # Delete existing group attribute self.ds[grp_].delncattr(attr_) else: # Delete existing root attribute self.ds.delncattr(attr) if hasattr(update,'__iter__') and type(update) not in [str]: # If is a list of strings then join update = ', '.join(update[:]) # Append update to existing attribute string and rewrite into ds if grp_ == '': self.ds.setncattr(attr,'{}, {}'.format(attr_old,update)) else: self.ds[grp_].setncattr(attr_,'{}, {}'.format(attr_old,update))
[docs] def change_val(self,var,old_val,new_val): """Changes a single variable/attribute value. The variable or attribute name must be given along with the old value, ``old_val``, that is to be change to ``new_val``. If ``old_val`` is not found then nothing is done. .. todo:: Not implemented """ pass
def _add_coord(self,coord,vals): """Adds extra values to end of an unlimited coordinate. If a coordinate is increased in size then all of the variables that depend on that coordinate are increased to the same size along the unlimited dimension. This internal method should usually be followed by ``_add_var()``. Args: coord (:obj:`str`): string of path/name of coordinate variable. This can be found with ``os.path.join(self.ds[coord].group().path,self.ds[coord].name)`` vals (`iterable`): Iterable of values to append to the end of ``self.ds[coord]``. Type must match the dtype of the coordinate variable. """ # Ensure that is unlimited coordinate # Annoyingly have to split path and variable name cpath,cname = os.path.split(coord) if cpath in ['','/']: unlim = self.ds.dimensions[cname].isunlimited() else: unlim = self.ds[cpath].dimensions[cname].isunlimited() if (self.ds[coord].ndim != 1) or \ (self.ds[coord].name != self.ds[coord].dimensions[0]) or \ (unlim is False): # Either the variable name passed is not a coordinate (ie the # variable name and dimension are the same and 1d) or the # coordinate is not unlimited and thus cannot be extended. print('{} is not an unlimited coordinate'.format(coord)) return -1 # Ensure vals is an array and preserve any masking vals = np.ma.atleast_1d(vals) try: self.ds[coord][:] = append_time(self.ds[coord],vals,0) except Exception as err: print(err) pdb.set_trace() return 0 def _parent_dim(self, var, unlim=True): """ Returns paths/dimensions of given variable even if in parent group. var is full path to the variable name """ # Names of found dimensions that match those of var fnd_dims = [] fnd_unlim = [] _path, _ = os.path.split(var) while len(fnd_dims) < len(self.ds[var].dimensions): # Walk up through groups attempting to find required dimensions try: _dims = [d for d in self.ds[_path].dimensions if d in self.ds[var].dimensions] except (KeyError, IndexError) as err: # Step up to parent group break _dims = [d for d in self.ds.dimensions if d in self.ds[var].dimensions] finally: _unlim = [self.ds[_path].dimensions[d].isunlimited() for d in _dims] fnd_dims.extend([os.path.join(_path,d) for d in _dims]) fnd_unlim.extend(_unlim) _path = os.path.split(_path)[0] if len(fnd_dims) < len(self.ds[var].dimensions): fnd_dims.extend([d for d in self.ds.dimensions if d in self.ds[var].dimensions]) fnd_unlim.extend([d.isunlimited() for d in _dims]) if unlim == True: return np.array(fnd_dims)[fnd_unlim].tolist() elif unlim.lower() in ['index','idx']: # Return an index list of unlimited/fixed dimensions return fnd_unlim elif unlim.lower() in ['all', 'a']: return fnd_dims, fnd_unlim else: return fnd_dims def _add_var(self, var, vals): """Adds extra values to end of an already extended variable. If a coordinate is increased in size then all of the variables that depend on that coordinate are increased to the same size along the unlimited dimension. If the variable holds numbers then these extra values are masked. If the variable holds strings then they are just empty strings and there is no masking. This method writes the new values in vals into these new array positions of var. If the number of values is > number of new elements at the end of the array then it is assumed that there is an error and no action is taken. Args: var (:obj:`str`): string of path/name of variable. This can be found with ``os.path.join(self.ds[var].group().path,self.ds[var].name)`` vals (`iterable`): Iterable of values to append to the end of ``self.ds[var]``. Type must match the dtype of the variable. """ # Ensure vals is an array and preserve any masking vals = np.ma.atleast_1d(vals) if vals.ndim == self.ds[var].ndim - 1: # Assume that only a single element in the unlimited dimension # has been given in vals. Increase number of dimension to match var # This assures (?) that 0 axis is the unlimited one... vals = np.expand_dims(vals[::],axis=0) assert vals.ndim == self.ds[var].ndim, \ 'Mismatch in number of dimensions' # Annoyingly have to split path and variable name vpath,vname = os.path.split(var) # Find unlimited dimension/s for this var #print(var) dims, unlim = self._parent_dim(var, 'all') if unlim is []: print(err) print('Variable {} does not have unlimited dimension'.format(var)) return # Create slice to write to the correct dimension # That is the last elements of the unlimited dimension unlim_idx = [i for i,v in enumerate(unlim) if v == True] if len(unlim_idx) > 1: # There is more than one unlimited dimension. This is allows in nc4 # Currently there is no code to automatically determine which # dimension val should be added to. Therefore ask user for clarity, # this is a bit rubbish so maybe _add_var() should have an # additional arg for the dimension to extend?? print('\nMore than one UNLIMITED dimension found for variable: ' + '\n {}'.format(var)) req_unlim_idx = None while req_unlim_idx is None: for _i,_d in enumerate(np.array(dims)[unlim_idx]): print(' {}: {}'.format(_i, _d)) req_unlim_idx = input('Enter number of dimension to use [0]: ') if req_unlim_idx == '': req_unlim_idx = 0 elif int(req_unlim_idx) in range(_i): req_unlim_idx = int(req_unlim_idx) else: req_unlim_idx = None unlim_idx = unlim_idx[req_unlim_idx] print('Adding {} to dimension: {}\n'.format(var,dims[unlim_idx])) else: unlim_idx = unlim_idx[0] if self.ds[var].ndim == 1: # 'empty' slots filled from first position towards array end, # thus the -1*len(vals) idx = slice(-1*len(vals),None) else: # This is not as easy as one might think! assert unlim_idx == 0 # The lines below assume that the unlimited dimension that is # being extended is the 0th. The other indexers are set to write # only to as many elements in each dimension as required (those # outside this range remain masked). This is to cope with ragged # arrays. # However, it is likely/possible that this shall not always be the # case. I have not been able to test this case yet, thus the assert idx = [slice(0,i) for i in vals.shape] idx[unlim_idx] = -1 #idx[1] = range(len(vals[0])) if (self.ds[var].dtype in [str]) and \ ((self.ds[var][idx] == '').all()): # Strings are vlen arrays which have more limited access # Need to write each individual string seperately (?) for i,val in enumerate(vals[::-1]): # Use string format to convert (also converts None) self.ds[var][-1*(i+1)] = '{}'.format(val) elif any((self.ds[var][idx].mask.all(), np.isfinite(self.ds[var][idx].base).all())): # Assume that all other types of variables are masked try: self.ds[var][idx] = vals[0] except Exception as err: print('Variable: {}'.format(var)) pdb.set_trace() print(err) else: print('Insufficient empty space in {}!'.format(var))
[docs] def append_var(self,var,coord_vals,var_vals): """ Method to append to an existing netCDF4 variable and associated coord .. todo:: Redundant/not currently used. The extra coordinate values are appended to the coordinate of vname in self. This automatically creates the same number of masked entries in all of the variables that depend on that coordinate. :param var: String of path/name of variable. This can be found with ``os.path.join(self.ds[var].group().path,self.ds[var].name)`` :type var: String :param coord_vals: Iterable of values to append to the end of unlimited coordinate of variable var. :type coord_vals: List or array :param var_vals: Iterable of values to append to the end of var. Must be the same length as coord_vals in the unlimited dimension. :type var_vals: List or array .. note:: This is a bit complicated/not sensible. It is possible/probable to add a variable (along with the coord) then append to a different variable that uses the same coordinate which then writes the same coordinate values into the coordinate again. """
# # Find unlimited dimension of variable # for i,dim_ in enumerate(var.dimensions): # if var.isunlimited(): # break # if type(nvar) == netCDF4.Variable: # # Extract values out of variable # nvars = nvar[:] # elif isinstance(nvar,(int,float,str)): # # Ensure that new variables is a list if not a netCDF4 variable # nvars = [nvar] # if ovar.dtype == str: # avar = ovar.rstrip(',') + ','.join(nvars) # else: # avar = np.ma.concatenate((ovar[:],nvars), axis=i)
[docs] def append_dict(self,var_d): """Appends multiple variables with a single coordinate. Multiple variable values that use the same coordinate can be appended to existing dataset variables in one go. Variables that do not already exist in the dataset are ignored. Use add instead... Args var_d (:obj:`dict`): Dictionary of multiple variable values to be appended. Dictionary keys are the variable path+name strings and the dictionary values are either a netCDF variable or a sub-dictionary of values and attributes .. todo:: Currently does not accept netCDF4 variable values. Currently only accepts iterable of data. .. code-block:: python var_d = {coord: netCDF4.Variable, var1: [1,2,3,4,5], var2: {'_data': [1,2,3,4,5], 'var2_attr1': 'var2 attribute 1', 'var2_attr1': 'var2 attribute 1', ...} var3: 'Fred'} Note that all variables should be the same length if they are list-like. Any variables not the same length as the maximum length variable will be broadcast so that they are longer. This could well have unintended concequences however it does mean that variables that are the same thing repeated for all coordinate values (eg var3) will be replicated automatically. .. NOTE:: This is not done yet! There is nothing special about the coordinate variable, the function identifies the coordinate as being the variable with the same name as its dimension. """ # Find coordinate variable coord_d = {} var_keys = list(var_d.keys()) for k_ in var_keys: try: if self.ds[k_].name == self.ds[k_].dimensions[0]: # Coordinates can only have one dimension, thus the [0] # Coordinates cannot use a parent group dimension coord_d[k_] = var_d.pop(k_) except IndexError as err: # Variable does not exist in dataset so remove from var_d _ = var_d.pop(k_) for k_,v_ in coord_d.items(): err = self._add_coord(k_,v_) if err == -1: return -1 for k_,v_ in var_d.items(): try: self._add_var(k_,v_) except Exception as err: print('Variable: {}'.format(k_)) pdb.set_trace() print(err)
[docs] def append_dataset(self,ds, force_append=['username','history'], exclude=[]): """Adds groups, attributes, dimensions, and variables from ds. Attributes of ``self.ds`` shall take priority over those of the same name in `ds`, such attribute values of `ds` shall be ignored. The exception is if the attribute key is included in ``force_append``. In this case the resultant attribute shall be a comma-delineated combination string of the individual attributes with that from ``ds`` being appended to that of ``self.ds``. Variables from ``ds`` are appended to the same variable in ``self``. The variables are sorted by the unlimited dimension. Variables only in ds shall be added to ``self``. Any groups, attributes, or variables in ``ds`` that are not to be added or appended can be specified as a list with `exclude`. Args: ds (:obj:`netCDF4.Dataset`) netCDF Dataset to add into ``self.ds`` force_append (:obj:`list`): List of any root or group attribute strings that should always be appended to, even if they are identical. Default is ['username','history']. Group attribule strings must include full path. exclude (:obj:`list`): List of attribute or variable name strings (but not variable attributes) that are not to be added or appended to. """ def append_group(mgrp,ngrp): """Updates master ds group with values from new ds group. Either input may be a dataset, in which case the root group is operated on, or a group/subgroup within the dataset. This function does not walk down through any subsequent groups. Args: mgrp (:obj:`netCDF4.Dataset`): Master dataset object which may be root or a group. ngrp (:obj:`netCDF4.Dataset`): Dataset object the contents of which shall be added or appended to those in the master dataset object. """ # Add any new attributes, ignore any conflicts, string append any others new_attrs = {k_:v_ for (k_,v_) in ngrp.__dict__.items() \ if k_ not in mgrp.ncattrs()} app_attrs = {k_:v_ for (k_,v_) in ngrp.__dict__.items() \ if (k_ in mgrp.ncattrs() and v_ != mgrp.getncattr(k_) and k_ in force_append)} mgrp.setncatts(new_attrs) for k_,v_ in app_attrs.items(): if mgrp.getncattr(k_) == '': app_attr = ngrp.getncattr(k_) else: app_attr = ', '.join([mgrp.getncattr(k_)[::], ngrp.getncattr(k_)]) mgrp.setncattr_string(k_,app_attr) # Add any new dimensions new_dim = {d_:v_ for (d_,v_) in ngrp.dimensions.items() if d_ not in mgrp.dimensions} mgrp.dimensions.update(new_dim) # Add any new variables new_var = {n_:v_ for (n_,v_) in ngrp.variables.items() \ if n_ not in mgrp.variables} mgrp.variables.update(new_var) # Concatenate any variables along the unlimited dimension # that exist in master already. Do this in two steps as operating # directly on the dataset coordinate/s affects the dependent # variables immediately. # Note that new/changed variable attributes are not added. for n_ in ngrp.variables.keys(): try: fred = np.array_equal(ngrp.variables[n_][:], mgrp.variables[n_][:]) except: print('Error with array_equal') pdb.set_trace() app_var = {n_:v_ for (n_,v_) in ngrp.variables.items() \ if all([n_ in mgrp.variables, not np.array_equal(ngrp.variables[n_][:], mgrp.variables[n_][:])])} mod_var = {} for n_,v_ in app_var.items(): # Find unlimited dimension for i,d_ in enumerate(mgrp.variables[n_].dimensions): if mgrp.dimensions[d_].isunlimited(): break # Convert datetime stamps to datetime then back again ensuring # units are those of the master group. # Determine if timestamp with variable name and units that # include 'since'. A bit flakey but hopefully ok. if all(['time' in n_.lower(), 'units' in mgrp.variables[n_].ncattrs()]) \ and 'since' in mgrp.variables[n_].units.lower(): mod_var[n_] = append_time(mgrp.variables[n_], ngrp.variables[n_]) else: ### TODO: Use pint to make sure any units in variables are comparable ### and convert new variables to those used in master mod_var[n_] = np.ma.concatenate((mgrp.variables[n_][:], ngrp.variables[n_][:]), axis=i) # Write modified variables back into master for n_,v_ in mod_var.items(): mgrp.variables[n_][:] = v_ # Determine path strings to all (sub-)groups in both datasets mgrps = [] for grps in walk_dstree(self.ds): mgrps.extend([g_.path for g_ in grps]) ngrps = [] for grps in walk_dstree(ds): ngrps.extend([g_.path for g_ in grps]) # Determine groups that are in the new dataset that are not in the master # Create an equivalent empty group in the master, group will be filled # by calling append_dsgroup(). Sort list by length of string so # create upper level groups before any sub-groups. for grp in sorted(set(ngrps).difference(mgrps),key=len): self.ds.createGroup(grp) # Copy any new root attributes, dimensions, and/or variables to master append_group(self.ds,ds) # Do the same for all groups and sub-groups for grp in ngrps: append_group(self.ds[grp],ds[grp])