Add qutebrowser userscripts and minor fixes
This commit is contained in:
		
							parent
							
								
									44e50eabb1
								
							
						
					
					
						commit
						513e3b8263
					
				
					 5 changed files with 427 additions and 6 deletions
				
			
		|  | @ -3,8 +3,6 @@ | |||
| import os | ||||
| import argparse | ||||
| 
 | ||||
| import unalix | ||||
| 
 | ||||
| 
 | ||||
| def qute(cmd): | ||||
|     with open(os.environ['QUTE_FIFO'], 'w') as fifo: | ||||
|  | @ -32,13 +30,17 @@ if __name__ == '__main__': | |||
|         args = parse_args() | ||||
| 
 | ||||
|         url = os.environ['QUTE_URL'] | ||||
|         clean_url = unalix.clear_url(url) | ||||
| 
 | ||||
|         try: | ||||
|             import unalix | ||||
|             url = unalix.clear_url(url) | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         if args.selection: | ||||
|             qute('yank inline "{}" -s'.format(clean_url)) | ||||
|             qute('yank inline "{}" -s'.format(url)) | ||||
|         else: | ||||
|             qute('yank inline "{}"'.format(clean_url)) | ||||
|             qute('yank inline "{}"'.format(url)) | ||||
| 
 | ||||
|     except Exception as e: | ||||
|         error(str(e)) | ||||
|  |  | |||
							
								
								
									
										414
									
								
								.local/share/qutebrowser/userscripts/qute-keepassxc
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										414
									
								
								.local/share/qutebrowser/userscripts/qute-keepassxc
									
										
									
									
									
										Executable file
									
								
							|  | @ -0,0 +1,414 @@ | |||
| #!/usr/bin/env python3 | ||||
| 
 | ||||
| # Copyright (c) 2018-2021 Markus Blöchl <ususdei@gmail.com> | ||||
| # | ||||
| # This file is part of qutebrowser. | ||||
| # | ||||
| # qutebrowser is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| # | ||||
| # qutebrowser is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU General Public License for more details. | ||||
| # | ||||
| # You should have received a copy of the GNU General Public License | ||||
| # along with qutebrowser.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| """ | ||||
| # Introduction | ||||
| 
 | ||||
| This is a [qutebrowser][2] [userscript][5] to fill website credentials from a [KeepassXC][1] password database. | ||||
| 
 | ||||
| 
 | ||||
| # Installation | ||||
| 
 | ||||
| First, you need to enable [KeepassXC-Browser][6] extensions in your KeepassXC config. | ||||
| 
 | ||||
| 
 | ||||
| Second, you must make sure to have a working private-public-key-pair in your [GPG keyring][3]. | ||||
| 
 | ||||
| 
 | ||||
| Third, install the python module `pynacl`. | ||||
| 
 | ||||
| 
 | ||||
| Finally, adapt your qutebrowser config. | ||||
| You can e.g. add the following lines to your `~/.config/qutebrowser/config.py` | ||||
| Remember to replace `ABC1234` with your actual GPG key. | ||||
| 
 | ||||
| ```python | ||||
| config.bind('<Alt-Shift-u>', 'spawn --userscript qute-keepassxc --key ABC1234', mode='insert') | ||||
| config.bind('pw', 'spawn --userscript qute-keepassxc --key ABC1234', mode='normal') | ||||
| ``` | ||||
| 
 | ||||
| 
 | ||||
| # Usage | ||||
| 
 | ||||
| If you are on a webpage with a login form, simply activate one of the configured key-bindings. | ||||
| 
 | ||||
| The first time you run this script, KeepassXC will ask you for authentication like with any other browser extension. | ||||
| Just provide a name of your choice and accept the request if nothing looks fishy. | ||||
| 
 | ||||
| 
 | ||||
| # How it works | ||||
| 
 | ||||
| This script will talk to KeepassXC using the native [KeepassXC-Browser protocol][4]. | ||||
| 
 | ||||
| 
 | ||||
| This script needs to store the key used to associate with your KeepassXC instance somewhere. | ||||
| Unlike most browser extensions which only use plain local storage, this one attempts to do so in a safe way | ||||
| by storing the key in encrypted form using GPG. | ||||
| Therefore you need to have a public-key-pair readily set up. | ||||
| 
 | ||||
| GPG might then ask for your private-key password whenever you query the database for login credentials. | ||||
| 
 | ||||
| 
 | ||||
| [1]: https://keepassxc.org/ | ||||
| [2]: https://qutebrowser.org/ | ||||
| [3]: https://gnupg.org/ | ||||
| [4]: https://github.com/keepassxreboot/keepassxc-browser/blob/develop/keepassxc-protocol.md | ||||
| [5]: https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc | ||||
| [6]: https://keepassxc.org/docs/KeePassXC_GettingStarted.html#_setup_browser_integration | ||||
| """ | ||||
| 
 | ||||
| import sys | ||||
| import os | ||||
| import socket | ||||
| import json | ||||
| import base64 | ||||
| import shlex | ||||
| import subprocess | ||||
| import argparse | ||||
| 
 | ||||
| import nacl.utils | ||||
| import nacl.public | ||||
| 
 | ||||
| 
 | ||||
| def parse_args(): | ||||
|     parser = argparse.ArgumentParser(description="Full passwords from KeepassXC") | ||||
|     parser.add_argument('url', nargs='?', default=os.environ.get('QUTE_URL')) | ||||
|     parser.add_argument('--socket', '-s', default='/run/user/{}/org.keepassxc.KeePassXC.BrowserServer'.format(os.getuid()), | ||||
|                         help='Path to KeepassXC browser socket') | ||||
|     parser.add_argument('--key', '-k', default='alice@example.com', | ||||
|                         help='GPG key to encrypt KeepassXC auth key with') | ||||
|     parser.add_argument('--insecure', action='store_true', | ||||
|                         help="Do not encrypt auth key") | ||||
|     parser.add_argument('--dmenu-invocation', '-d', default='rofi -dmenu', | ||||
|                         help='Invocation used to execute a dmenu-provider') | ||||
|     parser.add_argument('--only-username', action='store_true', | ||||
|                         help='Only insert username') | ||||
|     parser.add_argument('--only-password', action='store_true', | ||||
|                         help='Only insert password') | ||||
|     parser.add_argument('--only-otp', action='store_true', | ||||
|                         help='Only insert OTP code') | ||||
|     return parser.parse_args() | ||||
| 
 | ||||
| 
 | ||||
| class KeepassError(Exception): | ||||
|     def __init__(self, code, desc): | ||||
|         self.code = code | ||||
|         self.description = desc | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return f"KeepassXC Error [{self.code}]: {self.description}" | ||||
| 
 | ||||
| 
 | ||||
| class KeepassXC: | ||||
|     """ Wrapper around the KeepassXC socket API """ | ||||
|     def __init__(self, id=None, *, key, socket_path): | ||||
|         self.sock        = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||||
|         self.id          = id | ||||
|         self.socket_path = socket_path | ||||
|         self.client_key  = nacl.public.PrivateKey.generate() | ||||
|         self.id_key      = nacl.public.PrivateKey.from_seed(key) | ||||
|         self.cryptobox   = None | ||||
| 
 | ||||
|     def connect(self): | ||||
|         if not os.path.exists(self.socket_path): | ||||
|             raise KeepassError(-1, "KeepassXC Browser socket does not exists") | ||||
|         self.client_id = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8') | ||||
|         self.sock.connect(self.socket_path) | ||||
| 
 | ||||
|         self.send_raw_msg(dict( | ||||
|             action    = 'change-public-keys', | ||||
|             publicKey = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'), | ||||
|             nonce     = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8'), | ||||
|             clientID  = self.client_id | ||||
|         )) | ||||
| 
 | ||||
|         resp = self.recv_raw_msg() | ||||
|         assert resp['action'] == 'change-public-keys' | ||||
|         assert resp['success'] == 'true' | ||||
|         assert resp['nonce'] | ||||
|         self.cryptobox = nacl.public.Box( | ||||
|             self.client_key, | ||||
|             nacl.public.PublicKey(base64.b64decode(resp['publicKey'])) | ||||
|         ) | ||||
| 
 | ||||
|     def get_databasehash(self): | ||||
|         self.send_msg(dict(action='get-databasehash')) | ||||
|         return self.recv_msg()['hash'] | ||||
| 
 | ||||
|     def lock_database(self): | ||||
|         self.send_msg(dict(action='lock-database')) | ||||
|         try: | ||||
|             self.recv_msg() | ||||
|         except KeepassError as e: | ||||
|             if e.code == 1: | ||||
|                 return True | ||||
|             raise | ||||
|         return False | ||||
| 
 | ||||
| 
 | ||||
|     def test_associate(self): | ||||
|         if not self.id: | ||||
|             return False | ||||
|         self.send_msg(dict( | ||||
|             action = 'test-associate', | ||||
|             id     = self.id, | ||||
|             key    = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') | ||||
|         )) | ||||
|         return self.recv_msg()['success'] == 'true' | ||||
| 
 | ||||
|     def associate(self): | ||||
|         self.send_msg(dict( | ||||
|             action = 'associate', | ||||
|             key    = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'), | ||||
|             idKey  = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') | ||||
|         )) | ||||
|         resp = self.recv_msg() | ||||
|         self.id = resp['id'] | ||||
| 
 | ||||
|     def get_logins(self, url): | ||||
|         self.send_msg(dict( | ||||
|             action = 'get-logins', | ||||
|             url    = url, | ||||
|             keys   = [{ 'id': self.id, 'key': base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') }] | ||||
|         )) | ||||
|         return self.recv_msg()['entries'] | ||||
| 
 | ||||
|     def get_totp(self, uuid): | ||||
|         self.send_msg(dict( | ||||
|             action = 'get-totp', | ||||
|             uuid = uuid, | ||||
|         )) | ||||
|         return self.recv_msg()['totp'] | ||||
| 
 | ||||
|     def send_raw_msg(self, msg): | ||||
|         self.sock.send( json.dumps(msg).encode('utf-8') ) | ||||
| 
 | ||||
|     def recv_raw_msg(self): | ||||
|         return json.loads( self.sock.recv(4096).decode('utf-8') ) | ||||
| 
 | ||||
|     def send_msg(self, msg, **extra): | ||||
|         nonce = nacl.utils.random(nacl.public.Box.NONCE_SIZE) | ||||
|         self.send_raw_msg(dict( | ||||
|             action   = msg['action'], | ||||
|             message  = base64.b64encode(self.cryptobox.encrypt(json.dumps(msg).encode('utf-8'), nonce).ciphertext).decode('utf-8'), | ||||
|             nonce    = base64.b64encode(nonce).decode('utf-8'), | ||||
|             clientID = self.client_id, | ||||
|             **extra | ||||
|         )) | ||||
| 
 | ||||
|     def recv_msg(self): | ||||
|         resp = self.recv_raw_msg() | ||||
|         if 'error' in resp: | ||||
|             raise KeepassError(resp['errorCode'], resp['error']) | ||||
|         assert resp['action'] | ||||
|         return json.loads(self.cryptobox.decrypt(base64.b64decode(resp['message']), base64.b64decode(resp['nonce'])).decode('utf-8')) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| class SecretKeyStore: | ||||
|     def __init__(self, gpgkey): | ||||
|         self.gpgkey = gpgkey | ||||
|         if gpgkey is None: | ||||
|             self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key') | ||||
|         else: | ||||
|             self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key.gpg') | ||||
| 
 | ||||
|     def load(self): | ||||
|         "Load existing association key from file" | ||||
|         if self.gpgkey is None: | ||||
|             jsondata = open(self.path, 'r').read() | ||||
|         else: | ||||
|             jsondata = subprocess.check_output(['gpg', '--decrypt', self.path]).decode('utf-8') | ||||
|         data = json.loads(jsondata) | ||||
|         self.id = data['id'] | ||||
|         self.key = base64.b64decode(data['key']) | ||||
| 
 | ||||
|     def create(self): | ||||
|         "Create new association key" | ||||
|         self.key = nacl.utils.random(32) | ||||
|         self.id = None | ||||
| 
 | ||||
|     def store(self, id): | ||||
|         "Store newly created association key in file" | ||||
|         self.id = id | ||||
|         jsondata = json.dumps({'id':self.id, 'key':base64.b64encode(self.key).decode('utf-8')}) | ||||
|         if self.gpgkey is None: | ||||
|             open(self.path, "w").write(jsondata) | ||||
|         else: | ||||
|             subprocess.run(['gpg', '--encrypt', '-o', self.path, '-r', self.gpgkey], input=jsondata.encode('utf-8'), check=True) | ||||
| 
 | ||||
| def fake_key_raw(text): | ||||
|     for character in text: | ||||
|         # Escape all characters by default, space requires special handling | ||||
|         sequence = '" "' if character == ' ' else '\{}'.format(character) | ||||
|         qute('fake-key {}'.format(sequence)) | ||||
| 
 | ||||
| def qute(cmd): | ||||
|     with open(os.environ['QUTE_FIFO'], 'w') as fifo: | ||||
|         fifo.write(cmd) | ||||
|         fifo.write('\n') | ||||
|         fifo.flush() | ||||
| 
 | ||||
| def error(msg): | ||||
|     print(msg, file=sys.stderr) | ||||
|     qute('message-error "{}"'.format(msg)) | ||||
| 
 | ||||
| 
 | ||||
| def connect_to_keepassxc(args): | ||||
|     assert args.key or args.insecure, "Missing GPG key to use for auth key encryption" | ||||
|     keystore = SecretKeyStore(args.key) | ||||
|     if os.path.isfile(keystore.path): | ||||
|         keystore.load() | ||||
|         kp = KeepassXC(keystore.id, key=keystore.key, socket_path=args.socket) | ||||
|         kp.connect() | ||||
|         if not kp.test_associate(): | ||||
|             error('No KeepassXC association') | ||||
|             return None | ||||
|     else: | ||||
|         keystore.create() | ||||
|         kp = KeepassXC(key=keystore.key, socket_path=args.socket) | ||||
|         kp.connect() | ||||
|         kp.associate() | ||||
|         if not kp.test_associate(): | ||||
|             error('No KeepassXC association') | ||||
|             return None | ||||
|         keystore.store(kp.id) | ||||
|     return kp | ||||
| 
 | ||||
| 
 | ||||
| def make_js_code(username, password): | ||||
|     return ' '.join(""" | ||||
|         function isVisible(elem) { | ||||
|             var style = elem.ownerDocument.defaultView.getComputedStyle(elem, null); | ||||
| 
 | ||||
|             if (style.getPropertyValue("visibility") !== "visible" || | ||||
|                 style.getPropertyValue("display") === "none" || | ||||
|                 style.getPropertyValue("opacity") === "0") { | ||||
|                 return false; | ||||
|             } | ||||
| 
 | ||||
|             return elem.offsetWidth > 0 && elem.offsetHeight > 0; | ||||
|         }; | ||||
| 
 | ||||
|         function hasPasswordField(form) { | ||||
|             var inputs = form.getElementsByTagName("input"); | ||||
|             for (var j = 0; j < inputs.length; j++) { | ||||
|                 var input = inputs[j]; | ||||
|                 if (input.type === "password") { | ||||
|                     return true; | ||||
|                 } | ||||
|             } | ||||
|             return false; | ||||
|         }; | ||||
| 
 | ||||
|         function loadData2Form (form) { | ||||
|             var inputs = form.getElementsByTagName("input"); | ||||
|             for (var j = 0; j < inputs.length; j++) { | ||||
|                 var input = inputs[j]; | ||||
|                 if (isVisible(input) && (input.type === "text" || input.type === "email")) { | ||||
|                     input.focus(); | ||||
|                     input.value = %s; | ||||
|                     input.dispatchEvent(new Event('input', { 'bubbles': true })); | ||||
|                     input.dispatchEvent(new Event('change', { 'bubbles': true })); | ||||
|                     input.blur(); | ||||
|                 } | ||||
|                 if (input.type === "password") { | ||||
|                     input.focus(); | ||||
|                     input.value = %s; | ||||
|                     input.dispatchEvent(new Event('input', { 'bubbles': true })); | ||||
|                     input.dispatchEvent(new Event('change', { 'bubbles': true })); | ||||
|                     input.blur(); | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         function fillFirstForm() { | ||||
|             var forms = document.getElementsByTagName("form"); | ||||
|             for (i = 0; i < forms.length; i++) { | ||||
|                 if (hasPasswordField(forms[i])) { | ||||
|                     loadData2Form(forms[i]); | ||||
|                     return; | ||||
|                 } | ||||
|             } | ||||
|             alert("No Credentials Form found"); | ||||
|         }; | ||||
| 
 | ||||
|         fillFirstForm() | ||||
|     """.splitlines()) % (json.dumps(username), json.dumps(password)) | ||||
| 
 | ||||
| 
 | ||||
| def dmenu(items, invocation): | ||||
|     command = shlex.split(invocation) | ||||
|     process = subprocess.run(command, input='\n'.join(items) | ||||
|                              .encode('UTF-8'), stdout=subprocess.PIPE) | ||||
|     return process.stdout.decode('UTF-8').strip() | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|     if 'QUTE_FIFO' not in os.environ: | ||||
|         print(f"No QUTE_FIFO found - {sys.argv[0]} must be run as a qutebrowser userscript") | ||||
|         sys.exit(-1) | ||||
| 
 | ||||
|     try: | ||||
|         args = parse_args() | ||||
|         assert args.url, "Missing URL" | ||||
|         kp = connect_to_keepassxc(args) | ||||
|         if not kp: | ||||
|             error('Could not connect to KeepassXC') | ||||
|             return | ||||
|         creds = kp.get_logins(args.url) | ||||
|         if not creds: | ||||
|             error('No credentials found') | ||||
|             return | ||||
| 
 | ||||
| 
 | ||||
|         selection = creds[0] | ||||
|         if len(creds) > 1: | ||||
|             login = dmenu(sorted(map(lambda c: c['login'], creds)), args.dmenu_invocation) | ||||
| 
 | ||||
|             for c in creds: | ||||
|                 if c['login'] == login: | ||||
|                     selection = c | ||||
|                     break | ||||
| 
 | ||||
|         name, pw = selection['login'], selection['password'] | ||||
| 
 | ||||
|         if args.only_username: | ||||
|             if name: | ||||
|                 fake_key_raw(name) | ||||
|             return | ||||
|         if args.only_password: | ||||
|             if pw: | ||||
|                 fake_key_raw(pw) | ||||
|             return | ||||
|         if args.only_otp: | ||||
|             otp = kp.get_totp(selection['uuid']) | ||||
|             if otp: | ||||
|                 fake_key_raw(otp) | ||||
|             return | ||||
| 
 | ||||
|         if name and pw: | ||||
|             qute('jseval -q ' + make_js_code(name, pw)) | ||||
|     except Exception as e: | ||||
|         error(str(e)) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     main() | ||||
| 
 | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue