# Copyright (c) 2018 Stephen Wasilewski
# =======================================================================
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =======================================================================
"""callbacks for special parsing of command line inputs
**Callbacks By type**
**File input**
file inputs can be given with wildcard expansion (in quotes so that the
callback handles) using glob plus the following:
* [abc] (one of a, b, or c)
* [!abc] (none of a, b or c)
* '-' (hyphen) collect the stdin into a temporary file (clasp_tmp*)
* ~ expands user
**callback functions**
* is_file: check if a single path exists (prompts for user input if file
not found)
* are_files: recursively calls parse_file_list and prompts on error
* is_file_iter: use when multiple=True
* are_files_iter: use when mulitple=True
* are_files_or_str: tries to parse as files, then tries split_float, then
split_int, then returns string
* are_files_or_str_iter: use when mulitple=True
**String parsing**
* split_str: split with shlex.split
* split_str_iter: use when multiple=True
* color_inp: return alpha string, split on whitespace,
convert floats and parse tuples on ,
* char0: return first character
**Number parsing**
* tup_int: parses integer tuples from comma/space separated string
* tup_float: parses float tuples from comma/space separated string
* split_float: splits list of floats and extends ranges based on : notation
* split_int: splits list of ints and extends ranges based on : notation
"""
import os
import re
import sys
import shlex
import tempfile
import click
from clasp.script_tools import sglob
import clasp.script_tools as cst
pretty_callback_names = {
'is_file' : 'FILE',
'are_files': 'FILES',
'is_file_iter': 'FILES',
'are_files_iter': 'FILES',
'are_files_or_str': 'FILES,INTS,FLOATS,TEXTS',
'are_files_or_str_iter': 'FILES,INTS,FLOATS,TEXTS',
'split_str': 'TEXTS',
'split_str_iter': 'TEXTS',
'color_inp': 'COLORS',
'tup_int': 'INTS INTS,INTS',
'tup_float': 'FLOATS, FLOATS,FLOATS',
'split_float': 'FLOATS',
'split_int': 'INTS',
'are_valid_paths': 'PATHS',
'int_tups': 'INT,INT INT,INT',
'tup_list': 'TEXT',
}
[docs]def callback_error(s, param, example):
"""standard error message for exceptions raised during argument parsing
used by custom callback functions raises ClickException
Parameters
----------
s: value
param: click.core.Option
example: example of valid entry format
"""
message = "\ncan't parse: {}\nexpected input "\
"format: '{}'".format(s, example)
raise click.BadParameter(message)
[docs]def is_file(ctx, param, s):
"""checks input file string with recursive prompt
use os.environ['CLASP_PIPE'] = '1' in parent script
or set CLASP_PIPE=1
to disable prompt and avoid hanging process
"""
if s == '-':
return tmp_stdin(ctx)
if s in [None, 'None', 'none']:
return None
command = ctx.info_name
name = param.name
try:
if os.path.exists(s):
return s
else:
raise ValueError(s)
except ValueError as e:
try:
nopipe = os.environ['CLASP_PIPE'] != '1'
except KeyError:
nopipe = True
click.echo("{} not an existing file".format(e), err=True)
if nopipe and not ctx.resilient_parsing:
s2 = click.prompt("{} for {}".format(name, command))
return is_file(ctx, param, s2)
else:
raise click.Abort()
[docs]def char0(ctx, param, s):
return s[0].lower()
[docs]def are_files(ctx, param, s, prompt=True):
"""checks input file list string with recursive prompt
use os.environ['CLASP_PIPE'] = '1' in parent script
or set CLASP_PIPE=1
to disable prompt and avoid hanging process
"""
if s in [None, 'None', 'none']:
return None
command = ctx.info_name
name = param.name
try:
return parse_file_list(ctx, s)
except ValueError as e:
if prompt and not ctx.resilient_parsing:
try:
nopipe = os.environ['CLASP_PIPE'] != '1'
except KeyError:
nopipe = True
click.echo("{} not an existing file".format(e), err=True)
if nopipe and ctx.resilient_parsing:
s2 = click.prompt("{} for {}".format(name, command))
else:
raise click.Abort()
else:
raise ValueError(e)
return are_files(ctx, param, s2)
[docs]def are_valid_paths(ctx, param, s):
"""checks input file list
"""
if s in [None, 'None', 'none']:
return None
try:
return parse_file_list(ctx, s, valid=True)
except ValueError as e:
raise ValueError(e)
[docs]def is_files_iter(ctx, param, s):
"""calls are_files for each item in iterable s use with multiple=True"""
files = []
for s2 in s:
files.append(is_file(ctx, param, s2))
return files
[docs]def are_files_iter(ctx, param, s, prompt=True):
"""calls are_files for each item in iterable s use with multiple=True"""
files = []
for s2 in s:
files.append(are_files(ctx, param, s2, prompt))
return files
[docs]def are_files_or_str(ctx, param, s):
"""tries are_files for each item then split_str"""
try:
return are_files(ctx, param, s, False)
except ValueError:
pass
try:
return [str(i) for i in split_int(ctx, param, s)]
except Exception:
pass
try:
return [str(i) for i in split_float(ctx, param, s)]
except Exception:
pass
return split_str(ctx, param, s)
[docs]def are_files_or_str_iter(ctx, param, s):
"""tries are_files then split_str use with multiple=True"""
files = []
for s2 in s:
files.append(are_files_or_str(ctx, param, s2))
return files
#: string parsing callbacks
[docs]def split_str(ctx, param, s):
"""splits space seperated string"""
if s in [None, 'None', 'none']:
return None
elif len(s) == 0:
return ''
elif s[0] == '@':
if os.path.exists(s[1:]):
f = open(s[1:], 'r')
return shlex.split(f.read().strip())
else:
return shlex.split(s)
else:
return shlex.split(s)
[docs]def split_str_iter(ctx, param, s):
"""calls are_files for each item in iterable s use with multiple=True"""
args = []
for s2 in s:
args.append(split_str(ctx, param, s2))
return args
[docs]def color_inp(ctx, param, s):
"""parses color tuple from comma/space seperated string or cmap name"""
if re.match(r"^([\d\.]+[, \t]+)+[\d\.]+$", s):
so = []
for x in s.split():
if "," in x:
so.append(tuple(float(i) for i in x.split(",")))
else:
so.append(float(x))
return so
else:
return s
[docs]def tup_int(ctx, param, s, recurs=False):
"""parses integer or len 2 tuples from comma/space separated string
with range : notation"""
if s in [None, 'None', 'none']:
so = None
else:
if param.multiple and not recurs:
so = [tup_int(ctx, param, s2, recurs=True) for s2 in s]
if None in so:
return None
else:
return [tup_int(ctx, param, s2, recurs=True) for s2 in s]
try:
so = []
for x in s.split():
if "," in x:
for fis in cst.int_rng(x.split(",")[0]):
for col in cst.int_rng(x.split(",")[1]):
so.append((fis, col))
else:
so.extend(cst.int_rng(x))
except Exception as ex:
callback_error(s, param, '0 0,1 0,3')
return so
[docs]def int_tups(ctx, param, s):
"""parses integer tuples from comma/space separated string"""
if s in [None, 'None', 'none']:
so = None
else:
try:
so = []
for x in s.split():
so.append(tuple(int(i) for i in x.split(",")))
except Exception:
callback_error(s, param, '0,1 0,3.6')
return so
[docs]def tup_float(ctx, param, s):
"""parses float tuples from comma/space separated string"""
if s in [None, 'None', 'none']:
so = None
else:
try:
so = []
for x in s.split():
if "," in x:
so.append(tuple(float(i) for i in x.split(",")))
else:
so.append(float(x))
except Exception:
callback_error(s, param, '0 0,1 0,3.6')
return so
[docs]def tup_list(ctx, param, s):
"""convert tuple to list"""
if s is not None:
return list(s)
else:
return None
[docs]def split_float(ctx, param, s):
"""splits list of floats and extends ranges based on : notation"""
if s in [None, 'None', 'none']:
result = None
else:
try:
result = []
for part in s.split():
if ':' in part:
a = (float(i) for i in part.split(':'))
result.extend(cst.arange(*a))
else:
a = float(part)
result.append(a)
result = [round(i, 6) for i in result]
except Exception:
callback_error(s, param, '0 30.5 40')
return result
[docs]def split_int(ctx, param, s):
"""splits list of ints and extends ranges based on : notation"""
if s in [None, 'None', 'none']:
result = None
else:
try:
result = []
for part in s.split():
if ':' in part:
a = (int(i) for i in part.split(':'))
result.extend(cst.arange(*a))
else:
a = int(part)
result.append(a)
if len(result) == 1:
result = [int(result[0])]
except Exception:
callback_error(s, param, '0 30 40')
return result
[docs]def data_stream(ctx, param, s):
if s in [None, 'None', 'none']:
result = None
elif s == '-':
result = sys.stdin.buffer
else:
try:
result = open(s, 'rb')
except Exception:
callback_error(s, param, 'should be an existing file')
return result
[docs]def tmp_stdin(ctx):
'''read stdin into temporary file
use ctx.resilient_parsing=True to pass "-" directly
'''
if not ctx.resilient_parsing:
f, path = tempfile.mkstemp(dir="./", prefix='clasp_tmp')
f = open(path, 'wb')
f.write(sys.stdin.buffer.read())
f.close()
if ctx.obj is None:
ctx.obj = dict(temps=[path])
elif 'temps' in ctx.obj:
ctx.obj['temps'].append(path)
else:
ctx.obj['temps'] = [path]
return path
else:
return "-"
[docs]def parse_file_list(ctx, s, valid=False):
"""parses list of files using glob expansion"""
files = []
for i in shlex.split(s):
if "~" in i:
i = os.path.expanduser(i)
if i == '-':
files.append(tmp_stdin(ctx))
elif i[0] == '@':
if os.path.exists(i[1:]):
f = open(i[1:], 'r')
fi = [j.strip() for j in f.readlines()]
for l in fi:
files += parse_file_list(ctx, l)
else:
raise ValueError(i[1:])
elif len(sglob(i)) > 0:
for j in sglob(i):
if os.path.exists(j):
files.append(j)
else:
raise ValueError(j)
else:
if os.path.exists(i):
files.append(i)
elif valid and os.path.isdir(os.path.dirname(i)):
files.append(i)
else:
raise ValueError(i)
return files