#!/usr/bin/env python
# Import python modules
import os,sys,copy,warnings,itertools,inspect
import glob,json,jsonpickle,h5py,pickle,dill
import numpy as np
import pandas as pd
from natsort import natsorted, ns,index_natsorted,order_by_index
# Logging
import logging
[docs]logger = logging.getLogger(__name__)
# logger.setLevel(getattr(logging,log.upper()))
# Split path into directory,file,ext
[docs]def path_split(path,directory=False,file=False,ext=False,directory_file=False,file_ext=False,ext_delimeter='.'):
if not (directory or file or ext):
return path
returns = {'directory':directory,'file':file or directory_file or file_ext,'ext':ext}
paths = {}
paths['directory'] = os.path.dirname(path)
paths['file'],paths['ext'] = os.path.splitext(path)
if paths['ext'].startswith(ext_delimeter):
paths['ext'] = ext_delimeter.join(paths['ext'].split(ext_delimeter)[1:])
if not directory_file:
paths['file'] = os.path.basename(paths['file'])
if file_ext and paths['ext'].startswith(ext_delimeter):
paths['file'] = ext_delimeter.join([paths['file'],paths['ext']])
paths = [paths[k] for k in paths if returns[k]]
return paths if len(paths)>1 else paths[0]
# Join path by directories, with optional extension
[docs]def path_join(*paths,ext=None,abspath=False,ext_delimeter='.'):
path = os.path.join(*paths)
if ext is not None and not path.endswith('%s%s'%(ext_delimeter,ext)):
path = ext_delimeter.join([path,ext])
if abspath:
path = os.path.abspath(path)
return path
# glob path
[docs]def path_glob(path,**kwargs):
return glob.glob(os.path.abspath(os.path.expanduser(path)),**kwargs)
# Class wrapper for functions
[docs]class funcclass(object):
def __init__(self,func=lambda x:x):
self.func = func
[docs] def __call__(self,*args,**kwargs):
return self.func(*args,**kwargs)
# Serialize object to JSON
[docs]def serialize(obj,key='py/object'):
if callable(obj) or isinstance(obj,(slice,range)):
if callable(obj) and not inspect.isclass(obj):
obj = funcclass(obj)
obj = jsonpickle.encode(obj)
elif isinstance(obj,np.ndarray):
obj = obj.tolist()
return obj
# Deserialize object from JSON
[docs]def deserialize(obj,key='py/object'):
if isinstance(obj,dict) and key in obj:
obj = pickle.loads(str(obj[key]))
# return jsonpickle.decode(str(obj))
return obj
# Load data - General file import
[docs]def load(path,wr='r',default=None,verbose=False,**kwargs):
loaders = {**{ext: (lambda obj,ext=ext,**kwargs:getattr(pd,'read_%s'%ext)(obj,**kwargs)) for ext in ['csv']},
**{ext: (lambda obj,ext=ext,**kwargs:getattr(pd,'read_%s'%ext)(obj,**kwargs) if wr=='r' else (pickle.load(obj,**kwargs))) for ext in ['pickle']},
**{ext: (lambda obj,ext=ext,**kwargs: json.load(obj,**{'object_hook':deserialize,**kwargs})) for ext in ['json']},
}
if not isinstance(path,str):
return default
if path is None:
return default
ext = path.split('.')[-1]
if ('.' in path) and (ext in loaders):
paths = {ext: path}
else:
paths = {e: '%s.%s'%(path,e) for e in loaders}
loaders = {paths[e]: loaders[e] for e in paths}
for path in loaders:
loader = loaders[path]
for wr in [wr,'r','rb']:
try:
data = loader(path,**kwargs)
logger.log(verbose,'Loading path %s'%(path))
return data
except Exception as e:
try:
with open(path,wr) as obj:
data = loader(obj,**kwargs)
logger.log(verbose,'Loading obj %s'%(path))
return data
except:
pass
return default
# Dump data - General file save/export
[docs]def dump(data,path,wr='w',verbose=False,**kwargs):
dumpers = {**{ext: (lambda data,obj,ext=ext,**kwargs:getattr(data,'to_%s'%ext)(obj,**{'index':False,**kwargs})) for ext in ['csv']},
**{ext: (lambda data,obj,ext=ext,**kwargs:getattr(data,'to_%s'%ext)(obj,**kwargs) if isinstance(data,pd.DataFrame) else pickle.dump(data,obj,protocol=pickle.HIGHEST_PROTOCOL,**kwargs)) for ext in ['pickle']},
**{ext: (lambda data,obj,ext=ext,**kwargs: json.dump(data,obj,**{'default':serialize,'ensure_ascii':False,'indent':4,**kwargs})) for ext in ['json']},
**{ext: (lambda data,obj,ext=ext,**kwargs: obj.write(data,**kwargs)) for ext in ['tex']},
}
if path is None:
return
ext = path.split('.')[-1]
if ('.' in path) and (ext in dumpers):
paths = {ext: path}
else:
paths = {e: '%s.%s'%(path,e) for e in dumpers}
return
dumpers = {paths[e]: dumpers[e] for e in paths}
for path in dumpers:
dirname = os.path.abspath(os.path.dirname(path))
if not os.path.exists(dirname):
os.makedirs(dirname)
for path in dumpers:
dumper = dumpers[path]
for _wr in [wr,'w','wb']:
with open(path,_wr) as obj:
try:
dumper(data,path,**kwargs)
logger.log(verbose,'Dumping path %s'%(path))
return
except Exception as e:
try:
dumper(data,obj,**kwargs)
logger.log(verbose,'Dumping obj %s'%(path))
return
except Exception as e:
try:
# dumper(pickleable(copy.deepcopy(data),_return=True),path,**kwargs)
dumper(data,path,**kwargs)
except:
try:
# dumper(pickleable(copy.deepcopy(data),_return=True),obj,**kwargs)
dumper(data,obj,**kwargs)
except:
pass
return
# Check if object can be written to file
# Check if object can be pickled
[docs]def pickleable(obj,path=None,_return=False):
if isinstance(obj,dict):
pickleables = {k: pickleable(obj[k],path,_return=False) for k in obj}
for k in pickleables:
if not pickleables[k]:
obj.pop(k);
pickleables[k] = True
if _return:
return obj
else:
return all([pickleables[k] for k in pickleables])
ispickleable = False
if path is None:
path = '__tmp__.__tmp__.%d'%(np.random.randint(1,int(1e8)))
with open(path,'wb') as fobj:
try:
pickle.dump(obj,fobj)
ispickleable = True
except Exception as e:
pass
if os.path.exists(path):
os.remove(path)
return ispickleable
# Import Data as pandas dataframe
[docs]def importer(files,directory,wr='rb',verbose=False):
paths = []
for file in files:
path = path_join(directory,file,abspath=True)
paths.extend(path_glob(path,recursive=True))
paths = natsorted(paths)
data = [load(path,wr=wr,verbose=verbose) for path in paths]
if len(data) > 0:
df = pd.concat(data,axis=0,ignore_index=True)
return df
else:
return None
# Sort Data
[docs]def sorter(seq,index,multiple=False,wrapper=lambda arr:arr):
index = index_natsorted(index)
if multiple:
return [wrapper(order_by_index(s,index)) for s in seq]
else:
return wrapper(order_by_index(seq,index))
# Flattening multi-dimensional data
[docs]def _flatten(df,exceptions=[]):
if df is None:
return
for label in df:
if label in exceptions:
continue
data = np.array([i for i in df[label].values]) #.astype('float64')
if data.ndim<=1:
continue
shape = data.shape
labels = itertools.product(*[range(i) for i in shape[1:]])
for _label in labels:
d = data.reshape(*shape[1:],shape[0])
for i in _label:
d = d[i]
_label = '_'.join([label,*[str(i) for i in _label]])
df[_label] = d
df.drop(columns=label,inplace=True)
return
# Setup Data - Global function to call specific setup functions
[docs]def setup(data,metadata,files,directories__load,directories__dump=None,metafile=None,wr='rb',flatten_exceptions=[],verbose=False,**kwargs):
_setups = _setup()
_setup_ = _setups['default']
# Get loader depending on extension of files
for exts in _setups:
if any([path_split(file,ext=True) == exts for file in files]):
_setup_ = _setups[exts]
break
# Get regexed directories
if directories__dump is None:
directories__dump = directories__load.copy()
_directories(directories__load,directories__dump)
# Load Data
_setup_(data,metadata,files,directories__load,metafile,wr,flatten_exceptions,verbose,**kwargs)
for key in data:
directory_load = metadata[key].pop('directory')
for dir_load,dir_dump in zip(directories__load,directories__dump):
if dir_load in directory_load and len(dir_load)>=len(directory_load):
directory_dump = dir_dump
break
metadata[key]['directory'] = {'load':directory_load,'dump':directory_dump}
metadata[key]['type'] = 'imported'
return
[docs]def _directories(directories__load,directories__dump):
def replace(strings,patterns,threshold=None,delimeter=''):
if threshold is None:
threshold = len(patterns)
matches = []
nonmatches = []
for pattern in patterns:
if patttern in strings:
matches.append(pattern)
else:
nonmatches.append(pattern)
if ((isinstance(threshold,(int,np.integer)) and (len(matches)>=threshold)) or (callable(threshold) and threshold(matches))):
for i,(substring,pattern) in ennumerate(zip(strings,nonmatches)):
strings[i] = patterns
string = strings.join(delimeter)
return string
# Glob directories patterns
# Ensure directories exist
for directories in [directories__load,directories__dump]:
directories_split = [directory.split('*') for directory in directories]
directories_glob = natsorted([d for directory in directories
for d in path_glob(directory)])
if len(directories_glob) <= len(directories):
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory)
directories_glob = natsorted([d for directory in directories
for d in path_glob(directory)])
directories.clear()
directories.extend(directories_glob)
if len(directories__dump) < len(directories__load):
directories__dump.clear()
directories__dump.extend([directory for directory in directories__load])
for directories in [directories__load,directories__dump]:
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory)
return
[docs]def _setup():
# Default CSV/Pickle file input
def _default(data,metadata,files,directories,metafile=None,wr='rb',flatten_exceptions=[],verbose=False,**kwargs):
for directory in directories:
key = directory #os.path.basename(directory)
data[key] = importer(files,directory,wr=wr,verbose=verbose)
if data[key] is None:
data.pop(key)
continue
_flatten(data[key],flatten_exceptions)
metadata[key] = {}
if isinstance(metafile,str):
metadata[key] = load(path_join(directory,metafile),default=metadata[key],wr=wr,verbose=verbose)
metadata[key]['directory'] = directory
logger.log(verbose,'Importing: %s %r'%(key,data[key].shape if data[key] is not None else None))
return
# csv file input
def _csv(*args,**kwargs):
return _default(*args,**kwargs)
# csv file input
def _pickle(*args,**kwargs):
return _default(*args,**kwargs)
# HDf5 file input
def _h5(data,metadata,files,directories,metafile=None,wr='rb',flatten_exceptions=[],verbose=False,**kwargs):
def zero_check(x,val=0):
try:
return x[~np.equal(x,val)][0]
except IndexError:
return val
join = lambda *args,seperator = '/':seperator.join(["",*args])
name = lambda state: int(state.split('_')[-1])
for directory in directories:
paths = []
for file in files:
path = path_join(directory,file,abspath=True)
paths.extend(path_glob(path,recursive=True))
paths = natsorted(paths)
for path in paths:
h = h5py.File(path,'r')
keys = kwargs.get('keys',[{},{}])
groups = kwargs.get('groups',[])
states = [k for k in h.keys() if groups[0] in k]
views = {}
for s in states[:]:
labels = {**{keys[0][key]:join(s,key) for key in keys[0]},
**{keys[1][key]:join(s,*groups[1:],key)
for key in keys[1]}}
s = name(s)
views[s] = {}
for label in labels:
try:
dataset = h[labels[label]]
shape = dataset.shape
ndim = dataset.ndim
if ndim == 0:
views[s][label] = float(dataset[...])
elif ndim == 3:
views[s][label] = zero_check(dataset[...])
elif ndim == 4:
for i in range(shape[0]):
views[s]['%s_%d_%d'%(label,i,0)] = zero_check(dataset[i])
elif ndim == 5:
for i in range(shape[0]):
for j in range(shape[1]):
views[s]['%s_%d_%d'%(label,i,j)] = zero_check(dataset[i][j])
except KeyError:
pass
key = directory
df = pd.DataFrame.from_dict(views, orient='index')
df.index.name = join(*groups).split('_')[0].replace('/','')
df.reset_index(drop=False,inplace=True)
df.to_pickle(path.replace('h5','pickle'))
if data.get(key) is not None:
data[key] = pd.concat([data[key],df],axis=0,ignore_index=True)
else:
data[key] = df.copy()
if metadata.get(key) is None:
metadata[key] = {}
if isinstance(metafile,str):
metadata[key] = load(path_join(directory,metafile),default=metadata[key],wr='rb',verbose=verbose)
metadata[key]['directory'] = directory
logger.log(verbose,'Importing: %s [%d]'%(key,len(data[key])))
return
def _mat(data,metadata,files,directories,metafile=None,wr='r',flatten_exceptions=[],verbose=False,**kwargs):
loader = sp.io.loadmat
for directory in directories:
paths = []
for file in files:
path = path_join(directory,file,abspath=True)
paths.extend(path_glob(path,recursive=True))
paths = natsorted(paths)
for path in paths:
try:
_data = loader(path)
except:
return
labels = [k for k in _data if ((not k.startswith('__')) and (not k.endswith('__')))]
shapes = {k: np.array(_data[k].shape) for k in labels if all([s>1 for s in list(_data[k].shape)])}
labels = list(shapes)
dims = min([shapes[k].size for k in shapes])
shape = [max([shapes[k][i] for k in shapes]) for i in range(dims)]
keys = ['_'.join(str(j) for j in i) for i in itertools.product(*[range(i) for i in shape])]
data.update({k:pd.DataFrame() for k in keys if k not in data})
for key in keys:
_shape = [int(i) for i in key.split('_')]
for label in labels:
d = _data[label]
for i in _shape:
d = d[i]
d = np.squeeze(d.reshape(sorted(d.shape,reverse=True)))
if d.ndim>1:
d = [i for i in d]
data[key][label] = d
metadata[key] = {}
if isinstance(metafile,str):
metadata[key] = load(path_join(directory,metafile),metadata[key],wr='rb',verbose=verbose)
metadata[key]['directory'] = directory
return
# xarray file input
def _nc(data,metadata,files,directories,metafile=None,wr='r',flatten_exceptions=[],verbose=False,**kwargs):
try:
import xarray as xr
except:
return
for directory in directories:
paths = []
for file in files:
path = path_join(directory,file,abspath=True)
paths.extend(path_glob(path,recursive=True))
paths = natsorted(paths)
for path in paths:
df = xr.open_dataset(path).to_dataframe()
df.reset_index(drop=False,inplace=True)
df.rename(columns=lambda x:x.upper(),inplace=True)
inputs = ['BURNUP', 'FUELTEMP', 'MODTEMP', 'MODDENS', 'BORON','BANK_POS',
'FUELTEMP_AVG', 'MODTEMP_AVG', 'MODDENS_AVG', 'BANK_POS_AVG']
outputs = ['NXSF', 'XSF', 'XSRM', 'XSTR', 'XSS']
groupby = ['INGROUP', 'OUTGROUP']
label = 'SAMPLE'
values = itertools.product(*[df[g].unique() for g in groupby])
df_ = None
for value in values:
_df = df.copy()
_df = _df[(_df[groupby]==value).all(axis=1)]
_df.rename(columns={y: '%s_%s'%(y,'_'.join([str(x) for x in value])) for y in outputs},inplace=True)
if df_ is None:
df_ = _df.copy().drop(groupby,axis=1).reset_index(drop=True)
else:
df.reset_index()
df_ = pd.merge(df_,_df.drop(groupby,axis=1).reset_index(drop=True),on=[label,*inputs] if label in df_ else inputs,how='outer',copy=False).reset_index(drop=True)
df_ = df_.T.drop_duplicates().T
df = df_
if label in df and len(df[label].unique()) > 1:
df_groupby = df.groupby(label)
for key in df_groupby.groups:
key = '%s_%s'%(path,key)
data[key] = df_groupby.get_group(key).drop(label,axis=1).reset_index(drop=True)
metadata[key]['directory'] = directory
else:
key = path
data[key] = df.reset_index(drop=True)
metadata[key]['directory'] = directory
return
locs = locals()
funcs = {k[1:]:locs[k] for k in locs if callable(locs[k]) and k.startswith('_')}
return funcs