From 513e3b82636b4dde91ae787f412f6bc96077023a Mon Sep 17 00:00:00 2001 From: Tobias Reisinger Date: Sat, 19 Mar 2022 11:33:43 +0100 Subject: [PATCH] Add qutebrowser userscripts and minor fixes --- .config/aliasrc | 1 + .config/qutebrowser/generic.py | 5 +- .config/user-dirs.dirs | 1 + .../qutebrowser/userscripts/qute-clear-url | 12 +- .../qutebrowser/userscripts/qute-keepassxc | 414 ++++++++++++++++++ 5 files changed, 427 insertions(+), 6 deletions(-) create mode 100755 .local/share/qutebrowser/userscripts/qute-keepassxc diff --git a/.config/aliasrc b/.config/aliasrc index 09d8a7f..87b7bf9 100644 --- a/.config/aliasrc +++ b/.config/aliasrc @@ -28,6 +28,7 @@ alias du='du -h' alias free='free -h' +alias git-apply-clip='clip -out | base64 -d | git apply -' alias grep='grep --color=auto' alias grep-highlight='grep -e "^" -e' diff --git a/.config/qutebrowser/generic.py b/.config/qutebrowser/generic.py index 9107014..1911ce7 100644 --- a/.config/qutebrowser/generic.py +++ b/.config/qutebrowser/generic.py @@ -6,7 +6,10 @@ def init(c): c.colors.webpage.preferred_color_scheme = 'dark' #c.content.proxy = "socks://localhost:9050/" - #c.content.headers.accept_language = "en-IE,en;q=0.9" + c.content.headers.accept_language = "en-US,en;q=0.5" + c.content.headers.custom = { + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" + } c.downloads.position = 'bottom' diff --git a/.config/user-dirs.dirs b/.config/user-dirs.dirs index e6cd0e0..26096b2 100644 --- a/.config/user-dirs.dirs +++ b/.config/user-dirs.dirs @@ -7,5 +7,6 @@ XDG_MUSIC_DIR="$HOME/music" XDG_PICTURES_DIR="$HOME/pictures" XDG_VIDEOS_DIR="$HOME/videos" XDG_CONFIG_HOME="$HOME/.config" +XDG_CONFIG_DIR="$HOME/.config" XDG_CACHE_DIR="$HOME/.cache" XDG_DATA_HOME="$HOME/.local/share" diff --git a/.local/share/qutebrowser/userscripts/qute-clear-url b/.local/share/qutebrowser/userscripts/qute-clear-url index 8e202a5..b1bbe70 100755 --- a/.local/share/qutebrowser/userscripts/qute-clear-url +++ b/.local/share/qutebrowser/userscripts/qute-clear-url @@ -3,8 +3,6 @@ import os import argparse -import unalix - def qute(cmd): with open(os.environ['QUTE_FIFO'], 'w') as fifo: @@ -32,13 +30,17 @@ if __name__ == '__main__': args = parse_args() url = os.environ['QUTE_URL'] - clean_url = unalix.clear_url(url) + try: + import unalix + url = unalix.clear_url(url) + except: + pass if args.selection: - qute('yank inline "{}" -s'.format(clean_url)) + qute('yank inline "{}" -s'.format(url)) else: - qute('yank inline "{}"'.format(clean_url)) + qute('yank inline "{}"'.format(url)) except Exception as e: error(str(e)) diff --git a/.local/share/qutebrowser/userscripts/qute-keepassxc b/.local/share/qutebrowser/userscripts/qute-keepassxc new file mode 100755 index 0000000..59c1bab --- /dev/null +++ b/.local/share/qutebrowser/userscripts/qute-keepassxc @@ -0,0 +1,414 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2018-2021 Markus Blöchl +# +# This file is part of qutebrowser. +# +# qutebrowser is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# qutebrowser is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with qutebrowser. If not, see . + +""" +# Introduction + +This is a [qutebrowser][2] [userscript][5] to fill website credentials from a [KeepassXC][1] password database. + + +# Installation + +First, you need to enable [KeepassXC-Browser][6] extensions in your KeepassXC config. + + +Second, you must make sure to have a working private-public-key-pair in your [GPG keyring][3]. + + +Third, install the python module `pynacl`. + + +Finally, adapt your qutebrowser config. +You can e.g. add the following lines to your `~/.config/qutebrowser/config.py` +Remember to replace `ABC1234` with your actual GPG key. + +```python +config.bind('', 'spawn --userscript qute-keepassxc --key ABC1234', mode='insert') +config.bind('pw', 'spawn --userscript qute-keepassxc --key ABC1234', mode='normal') +``` + + +# Usage + +If you are on a webpage with a login form, simply activate one of the configured key-bindings. + +The first time you run this script, KeepassXC will ask you for authentication like with any other browser extension. +Just provide a name of your choice and accept the request if nothing looks fishy. + + +# How it works + +This script will talk to KeepassXC using the native [KeepassXC-Browser protocol][4]. + + +This script needs to store the key used to associate with your KeepassXC instance somewhere. +Unlike most browser extensions which only use plain local storage, this one attempts to do so in a safe way +by storing the key in encrypted form using GPG. +Therefore you need to have a public-key-pair readily set up. + +GPG might then ask for your private-key password whenever you query the database for login credentials. + + +[1]: https://keepassxc.org/ +[2]: https://qutebrowser.org/ +[3]: https://gnupg.org/ +[4]: https://github.com/keepassxreboot/keepassxc-browser/blob/develop/keepassxc-protocol.md +[5]: https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc +[6]: https://keepassxc.org/docs/KeePassXC_GettingStarted.html#_setup_browser_integration +""" + +import sys +import os +import socket +import json +import base64 +import shlex +import subprocess +import argparse + +import nacl.utils +import nacl.public + + +def parse_args(): + parser = argparse.ArgumentParser(description="Full passwords from KeepassXC") + parser.add_argument('url', nargs='?', default=os.environ.get('QUTE_URL')) + parser.add_argument('--socket', '-s', default='/run/user/{}/org.keepassxc.KeePassXC.BrowserServer'.format(os.getuid()), + help='Path to KeepassXC browser socket') + parser.add_argument('--key', '-k', default='alice@example.com', + help='GPG key to encrypt KeepassXC auth key with') + parser.add_argument('--insecure', action='store_true', + help="Do not encrypt auth key") + parser.add_argument('--dmenu-invocation', '-d', default='rofi -dmenu', + help='Invocation used to execute a dmenu-provider') + parser.add_argument('--only-username', action='store_true', + help='Only insert username') + parser.add_argument('--only-password', action='store_true', + help='Only insert password') + parser.add_argument('--only-otp', action='store_true', + help='Only insert OTP code') + return parser.parse_args() + + +class KeepassError(Exception): + def __init__(self, code, desc): + self.code = code + self.description = desc + + def __str__(self): + return f"KeepassXC Error [{self.code}]: {self.description}" + + +class KeepassXC: + """ Wrapper around the KeepassXC socket API """ + def __init__(self, id=None, *, key, socket_path): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.id = id + self.socket_path = socket_path + self.client_key = nacl.public.PrivateKey.generate() + self.id_key = nacl.public.PrivateKey.from_seed(key) + self.cryptobox = None + + def connect(self): + if not os.path.exists(self.socket_path): + raise KeepassError(-1, "KeepassXC Browser socket does not exists") + self.client_id = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8') + self.sock.connect(self.socket_path) + + self.send_raw_msg(dict( + action = 'change-public-keys', + publicKey = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'), + nonce = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8'), + clientID = self.client_id + )) + + resp = self.recv_raw_msg() + assert resp['action'] == 'change-public-keys' + assert resp['success'] == 'true' + assert resp['nonce'] + self.cryptobox = nacl.public.Box( + self.client_key, + nacl.public.PublicKey(base64.b64decode(resp['publicKey'])) + ) + + def get_databasehash(self): + self.send_msg(dict(action='get-databasehash')) + return self.recv_msg()['hash'] + + def lock_database(self): + self.send_msg(dict(action='lock-database')) + try: + self.recv_msg() + except KeepassError as e: + if e.code == 1: + return True + raise + return False + + + def test_associate(self): + if not self.id: + return False + self.send_msg(dict( + action = 'test-associate', + id = self.id, + key = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') + )) + return self.recv_msg()['success'] == 'true' + + def associate(self): + self.send_msg(dict( + action = 'associate', + key = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'), + idKey = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') + )) + resp = self.recv_msg() + self.id = resp['id'] + + def get_logins(self, url): + self.send_msg(dict( + action = 'get-logins', + url = url, + keys = [{ 'id': self.id, 'key': base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') }] + )) + return self.recv_msg()['entries'] + + def get_totp(self, uuid): + self.send_msg(dict( + action = 'get-totp', + uuid = uuid, + )) + return self.recv_msg()['totp'] + + def send_raw_msg(self, msg): + self.sock.send( json.dumps(msg).encode('utf-8') ) + + def recv_raw_msg(self): + return json.loads( self.sock.recv(4096).decode('utf-8') ) + + def send_msg(self, msg, **extra): + nonce = nacl.utils.random(nacl.public.Box.NONCE_SIZE) + self.send_raw_msg(dict( + action = msg['action'], + message = base64.b64encode(self.cryptobox.encrypt(json.dumps(msg).encode('utf-8'), nonce).ciphertext).decode('utf-8'), + nonce = base64.b64encode(nonce).decode('utf-8'), + clientID = self.client_id, + **extra + )) + + def recv_msg(self): + resp = self.recv_raw_msg() + if 'error' in resp: + raise KeepassError(resp['errorCode'], resp['error']) + assert resp['action'] + return json.loads(self.cryptobox.decrypt(base64.b64decode(resp['message']), base64.b64decode(resp['nonce'])).decode('utf-8')) + + + +class SecretKeyStore: + def __init__(self, gpgkey): + self.gpgkey = gpgkey + if gpgkey is None: + self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key') + else: + self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key.gpg') + + def load(self): + "Load existing association key from file" + if self.gpgkey is None: + jsondata = open(self.path, 'r').read() + else: + jsondata = subprocess.check_output(['gpg', '--decrypt', self.path]).decode('utf-8') + data = json.loads(jsondata) + self.id = data['id'] + self.key = base64.b64decode(data['key']) + + def create(self): + "Create new association key" + self.key = nacl.utils.random(32) + self.id = None + + def store(self, id): + "Store newly created association key in file" + self.id = id + jsondata = json.dumps({'id':self.id, 'key':base64.b64encode(self.key).decode('utf-8')}) + if self.gpgkey is None: + open(self.path, "w").write(jsondata) + else: + subprocess.run(['gpg', '--encrypt', '-o', self.path, '-r', self.gpgkey], input=jsondata.encode('utf-8'), check=True) + +def fake_key_raw(text): + for character in text: + # Escape all characters by default, space requires special handling + sequence = '" "' if character == ' ' else '\{}'.format(character) + qute('fake-key {}'.format(sequence)) + +def qute(cmd): + with open(os.environ['QUTE_FIFO'], 'w') as fifo: + fifo.write(cmd) + fifo.write('\n') + fifo.flush() + +def error(msg): + print(msg, file=sys.stderr) + qute('message-error "{}"'.format(msg)) + + +def connect_to_keepassxc(args): + assert args.key or args.insecure, "Missing GPG key to use for auth key encryption" + keystore = SecretKeyStore(args.key) + if os.path.isfile(keystore.path): + keystore.load() + kp = KeepassXC(keystore.id, key=keystore.key, socket_path=args.socket) + kp.connect() + if not kp.test_associate(): + error('No KeepassXC association') + return None + else: + keystore.create() + kp = KeepassXC(key=keystore.key, socket_path=args.socket) + kp.connect() + kp.associate() + if not kp.test_associate(): + error('No KeepassXC association') + return None + keystore.store(kp.id) + return kp + + +def make_js_code(username, password): + return ' '.join(""" + function isVisible(elem) { + var style = elem.ownerDocument.defaultView.getComputedStyle(elem, null); + + if (style.getPropertyValue("visibility") !== "visible" || + style.getPropertyValue("display") === "none" || + style.getPropertyValue("opacity") === "0") { + return false; + } + + return elem.offsetWidth > 0 && elem.offsetHeight > 0; + }; + + function hasPasswordField(form) { + var inputs = form.getElementsByTagName("input"); + for (var j = 0; j < inputs.length; j++) { + var input = inputs[j]; + if (input.type === "password") { + return true; + } + } + return false; + }; + + function loadData2Form (form) { + var inputs = form.getElementsByTagName("input"); + for (var j = 0; j < inputs.length; j++) { + var input = inputs[j]; + if (isVisible(input) && (input.type === "text" || input.type === "email")) { + input.focus(); + input.value = %s; + input.dispatchEvent(new Event('input', { 'bubbles': true })); + input.dispatchEvent(new Event('change', { 'bubbles': true })); + input.blur(); + } + if (input.type === "password") { + input.focus(); + input.value = %s; + input.dispatchEvent(new Event('input', { 'bubbles': true })); + input.dispatchEvent(new Event('change', { 'bubbles': true })); + input.blur(); + } + } + }; + + function fillFirstForm() { + var forms = document.getElementsByTagName("form"); + for (i = 0; i < forms.length; i++) { + if (hasPasswordField(forms[i])) { + loadData2Form(forms[i]); + return; + } + } + alert("No Credentials Form found"); + }; + + fillFirstForm() + """.splitlines()) % (json.dumps(username), json.dumps(password)) + + +def dmenu(items, invocation): + command = shlex.split(invocation) + process = subprocess.run(command, input='\n'.join(items) + .encode('UTF-8'), stdout=subprocess.PIPE) + return process.stdout.decode('UTF-8').strip() + + +def main(): + if 'QUTE_FIFO' not in os.environ: + print(f"No QUTE_FIFO found - {sys.argv[0]} must be run as a qutebrowser userscript") + sys.exit(-1) + + try: + args = parse_args() + assert args.url, "Missing URL" + kp = connect_to_keepassxc(args) + if not kp: + error('Could not connect to KeepassXC') + return + creds = kp.get_logins(args.url) + if not creds: + error('No credentials found') + return + + + selection = creds[0] + if len(creds) > 1: + login = dmenu(sorted(map(lambda c: c['login'], creds)), args.dmenu_invocation) + + for c in creds: + if c['login'] == login: + selection = c + break + + name, pw = selection['login'], selection['password'] + + if args.only_username: + if name: + fake_key_raw(name) + return + if args.only_password: + if pw: + fake_key_raw(pw) + return + if args.only_otp: + otp = kp.get_totp(selection['uuid']) + if otp: + fake_key_raw(otp) + return + + if name and pw: + qute('jseval -q ' + make_js_code(name, pw)) + except Exception as e: + error(str(e)) + + +if __name__ == '__main__': + main() +