Source code for clasp.script_tools

# Copyright (c) 2018 Stephen Wasilewski
# =======================================================================
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =======================================================================

"""library of functions helpful for cli script development and parallel
computing particulary with subprocess calls."""
import sys
import shlex
import subprocess
import inspect
import tempfile
from glob import glob
import os
import re
import math
from concurrent.futures import ProcessPoolExecutor, as_completed
from clasp import click


encoding = sys.stdin.encoding
if encoding is None:
    encoding = 'UTF-8'


[docs]def try_mkdir(s): """silently ignore exceptions on mkdir""" try: os.mkdir(s) except Exception: pass
[docs]def arange(start, stop=None, step=1): """like numpy.arange for integers""" if stop is None: stop = start start = 0 n = int(math.ceil((stop - start)/step)) return [start + step*i for i in range(n)]
[docs]def int_rng(s): """expand start:end:inc notation into range""" result = [] for part in s.split(): if ':' in part: a = (int(i) for i in part.split(':')) result.extend(arange(*a)) else: a = int(part) result.append(a) if len(result) == 1: result = [int(result[0])] return result
[docs]def rm_dup(seq): """removes duplicates from list while preserving order""" mark = set() mark_add = mark.add return [x for x in seq if not (x in mark or mark_add(x))]
[docs]def warn_match(kwargs, sargs): for i in sargs: if i not in kwargs: click.echo('WARNING: {} not set'.format(i), err=True)
[docs]def kwarg_match(func, kwargs, debug=False): """filters dict for keys used by func""" sargs = inspect.getfullargspec(func).args argsc = {i: kwargs[i] for i in sargs if i in kwargs} if debug: warn_match(kwargs, sargs) return argsc
[docs]def arg_match(func, kwargs, *args): """filters dict for positional arguments used by func""" sargs = inspect.getfullargspec(func).args[len(args):] argsc = list(args) + [kwargs[i] if i in kwargs else None for i in sargs] return argsc
[docs]def kwarg_arg(func, kwargs, skip=None): """returns ordered list of optional arg values""" spec = inspect.getfullargspec(func) if skip is not None: oargs = spec.args[skip:] else: oargs = spec.args[-len(spec.defaults):] largs = [] for oarg, default in zip(oargs, spec.defaults): try: largs.append(kwargs[oarg]) except Exception: largs.append(default) return largs
[docs]def crossref(l1, l2): '''return all possible pairs of 2 lists''' n = len(l1) * len(l2) out = [[] for i in range(n)] for i, l in enumerate(l1): for j, m in enumerate(l2): out[i*len(l2)+j] += [l, m] return [flat_list(i) for i in out]
[docs]def crossref_all(l, followers=[]): '''return all possible combos of list of lists''' la = [] followed = [i[0] for i in followers] follows = [i[1] for i in followers] leaders = [i for i in range(len(l)) if i not in follows] for i in leaders: if i == 0: if i in followed: k = follows[followed.index(i)] try: la = [[lb, l[k][j]] for j, lb in enumerate(l[i])] except Exception: click.echo('length of follower must match lead', err=True) raise click.Abort() else: la = l[i] else: if i in followed: lb = crossref(la, list(range(len(l[i])))) k = follows[followed.index(i)] try: la = [j[:-1] + [l[i][j[-1]], l[k][j[-1]]] for j in lb] except IndexError: click.echo('length of follower must match lead', err=True) raise click.Abort() else: la = crossref(la, l[i]) return list(zip(*la))
[docs]def subpipe(commands): ''' parses special syntax in pipe expressions | $(some command) executes to a temporary file whose path is inserted in | the command. | $((expression)) evaluates a arithmetic expression in place +-*/() ''' temps = [] commands = flat_list([i.split("|") for i in commands]) for i, command in enumerate(commands): if re.match(r'.*\$\(.+\).*', command): subs = re.findall(r'\$\(.+\)', command) for sub in subs: if "$((" in sub: pt = sub[:] while "$((" in pt: si = pt.rfind("$((") op = 2 sj = si+3 for s in pt[sj:]: if op == 0: break if s == "(": op += 1 if s == ")": op -= 1 sj += 1 try: exp = pt[si+1:sj] pt = pt[:si] + str(eval(exp, {}, {})) + pt[sj:] except Exception: click.echo("bad expression: {}".format(exp)) raise click.Abort else: f, pt = tempfile.mkstemp(dir="./", prefix='clasp_tmp') temps.append(pt) pipeline([sub.strip('$()')], outfile=pt) commands[i] = command.replace(sub, pt) return temps, commands
[docs]def pipeline(commands, outfile=None, inp=None, close=False, cwd=None, writemode='w', forceinpfile=False, caperr=False): """ executes pipeline of shell commands (given as list of strings) special syntax: | $(some command) executes to a temporary file whose path is inserted in | the command. | $((expression)) evaluates a arithmetic expression in place +-*/() Parameters ---------- commands: list list of commands to execute in order outfile: writeable file object optional destination for stdout inp: str or filebuffer string to feed to stdin at start of pipeline close: bool if true closes file object before returning cwd: str directory to execute pipeline (temp files and Popen cwd) writemode: str passed to open() for outfile ('w', 'wb' for write or 'a' for append) forceinpfile: bool always treat inp as a file, if a string, open the path for reading Returns ------- out: str returns stdout of pipeline (will be None if outfile is given) """ temps, commands = subpipe(commands) pops = [0]*len(commands) if outfile is not None and not hasattr(outfile, 'read'): if cwd is not None: outfile = open(cwd + "/" + outfile, writemode) else: outfile = open(outfile, writemode) for i in range(len(commands)): if i == 0: if hasattr(inp, 'read'): strin = False stdin = inp elif forceinpfile and inp is not None: strin = False stdin = open(inp, 'r') elif inp is not None: strin = True stdin = subprocess.PIPE else: strin = True stdin = None else: stdin = pops[i-1].stdout if i == len(commands) - 1 and outfile is not None: stdout = outfile else: stdout = subprocess.PIPE try: if caperr: pops[i] = subprocess.Popen(shlex.split(commands[i]), stdin=stdin, stdout=stdout, cwd=cwd, stderr=subprocess.PIPE) else: pops[i] = subprocess.Popen(shlex.split(commands[i]), stdin=stdin, stdout=stdout, cwd=cwd) except OSError: message = "invalid command / no such file: {}".format(commands[i]) raise OSError(2, message) try: inp = inp.encode(encoding) except Exception as e: pass if len(commands) == 1 and strin: out = pops[0].communicate(inp) else: if inp is not None and strin: pops[0].stdin.write(inp) pops[0].stdin.close() out = pops[-1].communicate() try: output = out[0].decode(encoding) except Exception: output = out[0] pass if close: outfile.close() for temp in temps: os.remove(temp) if caperr: return output, out[1] else: return output
[docs]def flat_list(l): """flattens any depth list""" a = [] try: if type(l) == list: for i in l: a += flat_list(i) else: a.append(l) except Exception: a.append(l) return a
[docs]def pool_call(func, args, kwargs={}, cwd=None, order=True, expand=False, handle=False, test=False): """ execute func with concurrent.futures return output Parameters ---------- func: python function function to execute args: list of tuples each set is mapped to function kwargs: dict constant keyword args for func cwd: str directory in which to execute function calls order: bool whether to maintain order of input expand: bool whether to expand items in args to map to function args handle: bool whether to return future objects or results Returns ------- list of results unless handle=True then returns iterable of futures """ if cwd is not None: os.chdir(cwd) if test: return [func(*arg, **kwargs) for arg in args] with ProcessPoolExecutor() as executor: if expand: futures = [executor.submit(func, *arg, **kwargs) for arg in args] else: futures = [executor.submit(func, arg, **kwargs) for arg in args] if order: it = futures else: it = as_completed(futures) if handle: return it else: return [future.result() for future in it]
[docs]def cluster_call(func, args, kwargs={}, timeout=.1, cwd=None, debug=False): '''for backwards compatibility only''' args = zip(*args) largs = kwarg_match(func, kwargs) if 'debug' in kwargs: test = kwargs['debug'] or debug else: test = False or debug return pool_call(func, args, kwargs=largs, cwd=cwd, order=True, expand=True, test=test)
[docs]def read_epw(epw): '''read daylight sky data from epw or wea file Returns ------- out: tuple (month, day, hour, dirnorn, difhoriz, globhoriz, skycover) ''' if hasattr(epw, 'readlines'): f = epw else: f = open(epw, 'r') lines = f.readlines() f.seek(0) hours = [re.split(r'[ \t,]+', i) for i in lines if re.match(r"\d.*", i)] data = [] for h in hours: if len(h) > 23: dp = [h[1], h[2], h[3], h[14], h[15], h[16], h[23]] hoff = .5 else: try: dp = [h[0], h[1], h[2], h[3], h[4], h[5], h[6]] except IndexError: dp = [h[0], h[1], h[2], h[3], h[4], "0", "0"] hoff = 0 data.append([int(i.strip()) for i in dp[0:2]] + [float(dp[2]) - hoff] + [float(i.strip()) for i in dp[3:]]) return data
[docs]def isnum(s): """test if input can be converted to float""" try: float(s) return True except Exception: return False
[docs]def try_float(s): """attempt conversion to float""" try: a = float(s) return a except Exception: return s
[docs]def coerce_data(datastr, i_vals, dataf, coerce=True): '''ensure all data points parsed are valid numbers''' if datastr == [['']]: return [[]] try: i = None if coerce: data = [[float(j[i]) for j in datastr if isnum(j[i])] for i in i_vals] if len(data[0]) == 0: raise ValueError("check if data file {} has xheaders {}" "".format(dataf, datastr)) else: data = [[try_float(j[i]) for j in datastr] for i in i_vals] except ValueError as ex: raise ex except Exception: try: err = "list index out of range index: {} in file: {}"\ "".format(i, dataf) except Exception: err = "bad value or no data in file: {}, try coerce=False"\ "".format(dataf) raise IndexError(err) return data
[docs]def get_i(i, d_vals): """ if (x, y) return y if x == i if y return y """ ds = [] for j in d_vals: if type(j) == tuple and j[0] == i: ds.append(j[1]) elif type(j) != tuple: ds.append(j) return ds
[docs]def read_data_file(dataf, header=False, xheader=False, comment="#", delim="\t, ", coerce=True): delim = '[{}]+'.format(delim) if comment != "#": comment = "^[{}].*".format(comment) elif not header and coerce: comment = r"^[^\-\d\w\.].*" else: comment = "^[{}].*".format(comment) f = open(dataf, 'r') dl = [i.strip() for i in re.split(r'[\n\r]+', f.read().strip())] if xheader: dl = [re.split(delim, i.strip(), 1)[1] for i in dl] dl = [i for i in dl if not bool(re.match(comment, i))] if len(dl) == 0: click.echo("File: {} has no data".format(dataf), err=True) raise click.Abort() datastr = [[j.strip() for j in re.split(delim, i.strip())] for i in dl] f.close() return datastr
[docs]def read_data(dataf, x_vals=[0], y_vals=[-1], rows=False, header=False, weax=None, reverse=False, autox=None, comment="#", xheader=False, delim="\t, ", coerce=True, weatherfile=False, drange=None): """read generic csv/tsv data file Parameters ---------- dataf: str file to read data from x_vals: list of ints column (or row with rows=True) indices for x values y_vals: list of ints column (or row with rows=True) indices for y values rows: Boolean if True read data in rows header: Boolean return first row (or column with rows=True) as series labels weax: 2 item list of ints idx for month and day to use day number as x_vals, if given ignores x_val reverse: Boolean reverse order of data (use with autox) autox: Boolean assigns integers (starting at 0) as x_vals comment: str comment line signifiers (inserted in regex ^[comment].*) delim: str delimeters for parsing data (inserted in regex [delim]+) coerce: Boolean raise exception if all values are are not numbers weatherfile: str of file path handles wea and epw file formates returning daylight parameters drange: list of ints limit series output to given indices. Returns ------- datax: list list of x_vals for each y_val (pads with last item if necessary) if there are more x_vals than y_vals does not return excess datax datay: list list for each y_val head: list if header=True list of labels for each y_val else [] """ if weatherfile: datastr = [[str(i) for i in j] for j in read_epw(dataf)] head = ['month', 'day', 'hour', 'direct normal', 'diffuse horizontal', 'global horizontal', 'sky cover'] head = [head[i] for i in y_vals] else: datastr = read_data_file(dataf, header, xheader, comment, delim, coerce=coerce) if rows: if header: head = datastr[0] datastr = list(map(list, list(zip(*datastr[1:])))) else: datastr = list(map(list, list(zip(*datastr)))) head = [] elif header: head = [datastr[0][i] for i in y_vals] datastr = datastr[1:] else: head = [] if reverse: datastr.reverse() if drange is not None: datastr = [datastr[i] for i in drange] if len(y_vals) > 0: datay = coerce_data(datastr, y_vals, dataf, coerce) else: datay = [[]] if autox is not None: datax = [] for i in datay: try: inc = (autox[1]-autox[0])/float(len(i)-1) datax.append([j*inc+autox[0] for j in range(len(i))]) except ZeroDivisionError: datax.append([autox[0]]) elif weax is not None: daycount = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334] datax = [[float(j[i]) for i in weax] for j in datastr if isnum(j[weax[0]])] datax = [[daycount[int(i[0])-1] + i[1] for i in datax]] else: if len(x_vals) > 0: datax = coerce_data(datastr, x_vals, dataf, coerce) else: datax = [[]] while len(datax) < len(datay) and len(datax) > 0: datax += [datax[-1]] if len(datax) > len(datay): datax = datax[:len(datay)] return datax, datay, head
[docs]def read_all_data(datafs, x_vals=[], y_vals=[], **kwargs): """ read multiple data files and pair x and y data call read_data Parameters ---------- datafs: list of str files to read data from x_vals: list of ints or tuple int pairs (fileidx, colidx) or colidx to read from each file y_vals: list of ints or tuple int pairs (fileidx, colidx) or colidx to read from each file kwargs: optional arguments for read_data Returns ------- datax: list list of x_vals for each y_val (pads with last item if necessary) datay: list list for each y_val head: list if header=True list of labels for each y_val else [] """ xds = [] yds = [] labels = [] try: if kwargs['autox']: x_vals = y_vals except Exception: pass for x in x_vals: if type(x) == tuple: xd, _, _ = read_data(datafs[x[0]], [x[1]], [-1], **kwargs) xds += xd else: for d in datafs: xd, _, _ = read_data(d, [x], [-1], **kwargs) xds += xd for y in y_vals: if type(y) == tuple: _, yd, label = read_data(datafs[y[0]], [], [y[1]], **kwargs) yds += yd labels += label else: for d in datafs: _, yd, label = read_data(d, [], [y], **kwargs) yds += yd labels += label while len(xds) < len(yds) and len(xds) > 0: xds += [xds[-1]] return xds, yds, labels
[docs]def clean_tmp(ctx): f, path = tempfile.mkstemp(dir="./", prefix='clasp_tmp') if ctx.obj is None: ctx.obj = dict(temps=[path]) else: ctx.obj['temps'].append(path) return path
[docs]def expandpat(pat, s, mark=0): """expand sglob pattern for each character option Parameters ---------- pat: regex regex pattern to split on s: str string to split mark: int 0: include splitting mark in output 1: skip splitting mark (assume 1 character in length) Returns ------- allpat: list of strings list of strings enumerating all possible combinations of pattern """ if re.search(pat, s): parts = re.split(pat, s) marks = re.findall(pat, s) patm = [] for i, ma in enumerate(marks): part = [parts[i]] * (len(ma) - (2 + mark)) for j, mai in enumerate(ma[1 + mark:-1]): part[j] += mai patm.append(part) patm.append([parts[-1]]) allpat = [''.join(i) for i in zip(*crossref_all(patm))] return allpat else: return []
[docs]def sglob(s): '''super glob includes [abc] notation + [!abc] exclude notation''' inre = r'\[[\w\d\-\_\.]+\]' exre = r'\[\![\w\d\-\_\.]+\]' inpat = expandpat(inre, s) + [s] exglob = flat_list([expandpat(exre, i, 1) for i in inpat]) inglob = [re.sub(exre, '*', i) for i in inpat] infiles = set(flat_list([glob(i) for i in inglob])) exfiles = set(flat_list([glob(i) for i in exglob])) return sorted(list(infiles.difference(exfiles)))