#!/usr/bin/env python ############################################################################## # Quick Intro: # 1) Create '.wot' in your home directory. Fill it with public keys from 'wot'. # 2) Create '.seals' in your home directory. Place all signatures there from 'sigs'. # 3) Create a 'patches' directory somewhere where 'v' can find it. Or use this one. # 4) ./v.py patches command # e.g., # ./v.py patches w # ^^ displays WoT # ./v.py patches p patches/asciilifeform_add_verifyall_option.vpatch asciis_bleedingedge # ^^ this 'presses' (creates the actual tree) # ^^ approximately like a 'checkout' in your vanilla flavoured shithub. ############################################################################## import os, sys, shutil, argparse, re, tempfile, gnupg, subprocess ############################################################################## vver = 98 # This program's Kelvin version. ## HOW YOU CAN HELP: ## # * TESTS plox, ty! # # Report findings in #bitcoin-assets on Freenode. ############################################################################## prolog = '''\ (C) 2015 NoSuchlAbs. You do not have, nor can you ever acquire the right to use, copy or distribute this software ; Should you use this software for any purpose, or copy and distribute it to anyone or in any manner, you are breaking the laws of whatever soi-disant jurisdiction, and you promise to continue doing so for the indefinite future. In any case, please always : read and understand any software ; verify any PGP signatures that you use - for any purpose. ''' intro = "V (ver. {0}K)\n".format(vver) ############################################################################## def toposort(unsorted): sorted = [] unsorted = dict(unsorted) while unsorted: acyclic = False for node, edges in unsorted.items(): for edge in edges: if edge in unsorted: break else: acyclic = True del unsorted[node] sorted.append((node, edges)) if not acyclic: fatal("Cyclic graph!") return sorted ############################################################################## vpatch_path = "vpatch" verbose = False def fatal(msg): sys.stderr.write(msg + "\n") exit(1) def spew(msg): if verbose: print msg # List of files in a directory, in lexical order. def dir_files(dir): return sorted([os.path.join(dir, fn) for fn in next(os.walk(dir))[2]]) # GPG is retarded and insists on 'keychain.' # This will be a temp dir, because we don't do any crypto. gpgtmp = tempfile.mkdtemp() gpg = gnupg.GPG(gnupghome=gpgtmp) gpg.encoding = 'utf-8' # Known WoT public keys. pubkeys = {} # The subset of vpatches that are considered valid. patches = [] # Banners (i.e. vpatches mapped to their guarantors) banners = {} # Roots (i.e. vpatches parented by thin air) roots = [] # Table mapping file hash to originating vpatch desc = {} desc['false'] = 'false' # Grep for diff magics, and memoize def vpdata(path, exp, cache): l = cache.get(path) if not l: l = [] patch = open(path, 'r').read() for m in re.findall(exp, patch, re.MULTILINE): l += [{'p':m[0], 'h':m[1]}] cache[path] = l return l # Get parents of a vpatch pcache = {} def parents(vpatch): parents = vpdata(vpatch, r'^--- (\S+) (\S+)$', pcache) if not parents: fatal("{0} is INVALID, check whether it IS a vpatch!".format(vpatch)) return parents # Get children of a vpatch ccache = {} def children(vpatch): children = vpdata(vpatch, r'^\+\+\+ (\S+) (\S+)$', ccache) if not children: fatal("{0} is INVALID, check whether it IS a vpatch!".format(vpatch)) # Record descendents: for child in children: h = child['h'] if h != 'false': desc[h] = vpatch return children # It is entirely possible to have more than one root! # ... exactly how, is left as an exercise for readers. def find_roots(patchset): rset = [] # Walk, find roots for p in patchset: if all(p['h'] == 'false' for p in parents(p)): rset += [p] spew("Found a Root: '{0}'".format(p)) return rset # Get antecedents. def get_ante(vpatch): ante = {} for p in parents(vpatch): pp = desc.get(p['h']) # Patch where this appears if not ante.get(pp): ante[pp] = [] ante[pp] += [p['p']] return ante # Get descendants. def get_desc(vpatch): des = {} for p in patches: ante = get_ante(p) if vpatch in ante.keys(): des[p] = ante[vpatch] return des ############################################################################## # Print name of patch and its guarantors, or 'WILD' if none known. def disp_vp(vpatch): seals = ', '.join(map(str, banners[vpatch])) if seals == '': seals = 'WILD' return "{0} ({1})".format(vpatch, seals) ############################################################################## # Command: WoT def c_wot(args): for k in pubkeys.values(): print "{0}:{1} ({2})".format(k['handle'], k['fp'], k['id']) # Command: Flow def c_flow(args): for p in patches: print disp_vp(p) # Command: Roots. def c_roots(args): for r in roots: print "Root: " + disp_vp(r) # Command: Antecedents. def c_ante(args): ante = get_ante(args.query) for p in ante.keys(): if p != 'false': print "{0} [{1}]".format(disp_vp(p), '; '.join(map(str, ante[p]))) # Command: Descendants def c_desc(args): des = get_desc(args.query) for d in des.keys(): print "Descendant: {0} [{1}]".format(disp_vp(d), '; '.join(map(str, des[d]))) # Command: Press. def c_press(args): print "Pressing using head: {0} to path: '{1}'".format(args.head, args.dest) headpos = patches.index(args.head) seq = patches[:headpos + 1] if os.path.exists(args.dest): print "Warning: target {0} already exists".format(args.dest) else: os.mkdir(args.dest) for p in seq: print "Using: {0}".format(disp_vp(p)) out = subprocess.Popen([vpatch_path], cwd=args.dest, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) with open(p, "r") as inp: body = inp.read() stdout, stderr = out.communicate(body) print stdout, if out.returncode != 0: fatal(("-----------------------------------\n" + "There was an error while pressing:\n" + "{0}\n" + "Result in {1} might be in an invalid state!\n" + "-----------------------------------").format(stderr.strip(), args.dest)) print "Completed Pressing using head: {0} to path: '{1}'".format(args.head, args.dest) # Command: Origin. def c_origin(args): o = desc.get(args.query) if o: print disp_vp(o) else: print "No origin known." ############################################################################## ############################################################################## # Command line parameter processor. parser = argparse.ArgumentParser(description=intro, epilog=prolog) # Print paths, etc parser.add_argument('-v', dest='verbose', default=False, action="store_true", help='Verbose.') # Permit the use of patches no one has yet sealed. Use this ONLY for own dev work! parser.add_argument('-wild', dest='wild', default=False, action="store_true", help='Permit wild (UNSEALED!) vpatches.') # Glom keyid (short fingerprint) onto every WoT handle. parser.add_argument('-fingers', dest='fingers', default=False, action="store_true", help='Prefix keyid to all WoT handles.') # Default path of WoT public keys is /home/yourusername/.wot # This dir must exist. Alternatively, you may specify another. parser.add_argument('--wot', dest='wot', default=os.path.join(os.path.expanduser('~'), '.wot'), action="store", help='Use WoT in given directory. (Default: ~/.wot)') # Default path of the seals (PGP signatures) is /home/yourusername/.seals # This dir must exist. Alternatively, you may specify another. parser.add_argument('--seals', dest='seals', default=os.path.join(os.path.expanduser('~'), '.seals'), action="store", help='Use Seals in given directory. (Default: ~/.seals)') # REQUIRED: Path of directory with vpatches. parser.add_argument('vpatches', help='Vpatch directory to operate on. [REQUIRED]') # REQUIRED: Command. subparsers = parser.add_subparsers(help='Command [REQUIRED]') parser_w = subparsers.add_parser('w', help='Display WoT.') parser_w.set_defaults(f=c_wot) parser_r = subparsers.add_parser('r', help='Display Roots.') parser_r.set_defaults(f=c_roots) parser_a = subparsers.add_parser('a', help='Display Antecedents [PATCH]') parser_a.set_defaults(f=c_ante) parser_a.add_argument('query', action="store", help='Patch.') parser_d = subparsers.add_parser('d', help='Display Descendants [PATCH]') parser_d.set_defaults(f=c_desc) parser_d.add_argument('query', action="store", help='Patch.') parser_l = subparsers.add_parser('f', help='Compute Flow.') parser_l.set_defaults(f=c_flow) parser_p = subparsers.add_parser('p', help='Press [HEADPATCH AND DESTINATION]') parser_p.set_defaults(f=c_press) parser_p.add_argument('head', action="store", help='Head patch.') parser_p.add_argument('dest', action="store", help='Destionation directory.') parser_o = subparsers.add_parser('o', help='Find Origin [SHA512]') parser_o.set_defaults(f=c_origin) parser_o.add_argument('query', action="store", help='SHA512 to search for.') ############################################################################## # V cannot operate without vpatches, WoT, and Seals datasets. def reqdir(path): if (not (os.path.isdir(path))): fatal("Directory '{0}' does not exist!".format(path)) return path def main(): global verbose, pubkeys, patches, roots, banners args = parser.parse_args() verbose = args.verbose # Patch and Sigs dirs pdir = reqdir(args.vpatches) sdir = reqdir(args.seals) wdir = reqdir(args.wot) spew("Using patches from:" + pdir) spew("Using signatures from:" + sdir) spew("Using wot from:" + wdir) pfiles = dir_files(pdir) sfiles = dir_files(sdir) wfiles = dir_files(wdir) # Build WoT from pubkeys handle = {} for w in wfiles: pubkey = open(w, 'r').read() impkey = gpg.import_keys(pubkey) for fp in impkey.fingerprints: handle[fp] = os.path.splitext(os.path.basename(w))[0] for k in gpg.list_keys(): name = handle[k['fingerprint']] if args.fingers: name += '-' + k['keyid'] uids = ', '.join(map(str, k['uids'])) pubkeys[k['keyid']] = {'fp':k['fingerprint'], 'id':uids, 'handle':name} for subkey in k.get('subkeys', []): keyid, fingerprint = subkey[0], subkey[2] pubkeys[keyid] = {'fp':fingerprint, 'id':uids, 'handle':name} # Validate seals for p in pfiles: pt = os.path.basename(p) banners[p] = [] for s in sfiles: sig = os.path.basename(s) # All seals must take the form patchtitle.vpatch.yourname.sig if sig.find(pt) == 0: # substring of sig filename up through '.vpatch' v = gpg.verify_file(open(s, 'r'), data_filename=p) if v.valid: banners[p] += [pubkeys[v.key_id]['handle']] else: fatal("---------------------------------------------------------------------\n" + "WARNING: {0} is an INVALID seal for {1} !\n".format(sig, pt) + "Check that this user is in your WoT, and that this key has not expired.\n" + "Otherwise remove the invalid seal from your SEALS directory.\n" + "---------------------------------------------------------------------") # Select the subset of vpatches currently in use. for p in pfiles: if banners.get(p) or args.wild: patches += [p] children(p) # Memoize. parents(p) # Memoize. roots = find_roots(patches) if not roots: fatal('No roots found!') # Topological ordering of flow graph l = [] for p in patches: l += [(p, get_desc(p).keys())] s = map(lambda x:x[0], toposort(l)) patches = s[::-1] # Run command args.f(args) # Remove temporary keychain shutil.rmtree(gpgtmp) ############################################################################## if __name__ == '__main__' : main() ##############################################################################