rc3-counterbadge/counterbadge.py

227 lines
7.3 KiB
Python
Raw Normal View History

2022-01-01 01:18:13 +01:00
#!/usr/bin/env python3
import json
import sqlite3
import bottle
from bottle import run, get, abort, request, redirect
from beaker.middleware import SessionMiddleware
import urllib.request
import urllib.parse
from urllib.error import HTTPError
from uuid import uuid4
session_opts = {
'session.type': 'file',
'session.cookie_expires': 3600,
'session.data_dir': './data',
'session.auto': True,
'session.secure': True,
'session.samesite': 'None'
}
app = SessionMiddleware(bottle.app(), session_opts)
DB = sqlite3.connect('counterbadge.sqlite3')
DBC = DB.cursor()
try:
DBC.execute('''
CREATE TABLE IF NOT EXISTS counters (
id INTEGER PRIMARY KEY,
counter TEXT NOT NULL,
token TEXT NOT NULL,
username TEXT NOT NULL,
UNIQUE (counter, token, username)
)
''')
DBC.execute('''
CREATE INDEX IF NOT EXISTS idx_counters ON counters (counter, username)
''')
DB.commit()
except:
DB.rollback()
finally:
DBC.close()
def update_counter(counter, token, username):
cur = DB.cursor()
try:
changes = DB.total_changes
cur.execute('INSERT OR IGNORE INTO counters (counter, token, username) VALUES (:counter, :token, :username)', {
'counter': counter,
'token': token,
'username': username
})
changes = DB.total_changes - changes
cur.execute('SELECT COUNT(token) FROM counters WHERE counter = :counter AND username = :username', {
'counter': counter,
'username': username
})
rows = cur.fetchone()
DB.commit()
except BaseException as e:
DB.rollback()
raise e
finally:
cur.close()
return rows[0], changes > 0
def get_user(counterconf):
oauthconf = counterconf.get('oauth', {})
for key in ['client_id', 'client_secret', 'authorize_url', 'token_url', 'scope', 'redirect_url', 'response_type']:
if key not in oauthconf:
raise KeyError(key)
s = request.environ.get('beaker.session')
if 'username' in s:
return s['username']
s['redirect_url'] = request.url
s['state'] = uuid4()
query = {
'client_id': oauthconf['client_id'],
'response_type': oauthconf['response_type'],
'redirect_uri': oauthconf['redirect_url'],
'scope': oauthconf['scope'],
'state': s['state']
}
authorize = oauthconf['authorize_url'] + '?' + urllib.parse.urlencode(query)
redirect(authorize)
def redeem_code(oauthconf, code):
query = {
'grant_type': 'authorization_code',
'code': code,
'client_id': oauthconf['client_id'],
'client_secret': oauthconf['client_secret'],
'redirect_uri': oauthconf['redirect_url']
}
req = urllib.request.urlopen(oauthconf['token_url'], data=urllib.parse.urlencode(query).encode())
resp = json.loads(req.read().decode())
return resp.get('access_token')
def get_user_profile(counterconf, token):
headers = {
'Authorization': f'Bearer {token}'
}
req = urllib.request.Request(counterconf.get('profile_url'), headers=headers)
req = urllib.request.urlopen(req)
resp = json.loads(req.read().decode())
return resp.get('username')
@get('/counter/<counter>/redirect')
def oauth_redirect(counter):
with open('counterbadge.json', 'r') as f:
config = json.load(f)
counterconf = config.get('counters', {}).get(counter)
oauthconf = counterconf.get('oauth', {})
for key in ['client_id', 'client_secret', 'authorize_url', 'token_url', 'scope', 'redirect_url', 'response_type']:
if key not in oauthconf:
raise KeyError(key)
s = request.environ.get('beaker.session')
if 'username' in s and 'redirect_url' in s:
redirect(s['redirect_url'])
for key in ['code', 'state']:
if key not in request.query:
abort(400, 'Bad Request')
if 'state' not in s:
abort(400, 'Unknown state')
state = str(s['state'])
del s['state']
if state != request.query.state:
abort(401, 'Invalid state')
if 'token' in request.query:
token = request.query.token
else:
token = redeem_code(oauthconf, request.query.code)
username = None
if token:
username = get_user_profile(counterconf, token)
if username:
s['username'] = username
redirect(s['redirect_url'])
abort(401, 'Something went wrong' + token)
@get('/counter/<counter>/ping')
def ping(counter):
with open('counterbadge.json', 'r') as f:
config = json.load(f)
counterconf = config.get('counters', {}).get(counter)
username = get_user(counterconf)
if not username:
abort(401, 'Unautorized')
display_name = counterconf.get('display_name', counter)
return f'''
<h1>{display_name}</h1>
<p>
Hi {username}
</p>
'''
@get('/counter/<counter>/<token>')
def counter(counter, token):
with open('counterbadge.json', 'r') as f:
config = json.load(f)
counterconf = config.get('counters', {}).get(counter)
username = get_user(counterconf)
if not username:
abort(401, 'Unautorized')
display_name = counterconf.get('display_name', counter)
n_max = len(counterconf.get('unique_tokens', []))
if not counterconf:
abort(404, 'Not found')
if token not in counterconf.get('unique_tokens', []):
abort(404, 'Not found')
redeem_url = counterconf.get('redeem_url')
if not redeem_url:
abort(500, 'Internal Server Error')
api_token = counterconf.get('api_token')
if not api_token:
abort(500, 'Internal Server Error')
issue_at = counterconf.get('issue_at', {})
n, changed = update_counter(counter, token, username)
issued = False
for i, redeem_token in issue_at.items():
if int(i) <= n:
body = json.dumps({'token': redeem_token, 'username': username}).encode()
try:
headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'rc3-counterbadge/0.1 (https://git.kabelsalat.ch/rc3-counterbadge)',
}
request = urllib.request.Request(redeem_url, data=body, headers=headers)
response = urllib.request.urlopen(request)
resp = json.loads(response.read().decode())
# The API documentation is lying :( it doesn't in fact return
# whether the badge was actually issued
#issued = issued or resp.get('created', False)
if changed and int(i) == n:
issued = True
except HTTPError as e:
print(e, e.read().decode())
abort(500, 'Internal Server Error')
if issued:
if n < n_max:
issued = f'<p>Du hast eine Badge erhalten. Finde alle {n_max} Gegenstände, um mehr Badges zu erhalten.</p>'
else:
issued = f'<p>Du hast alle {n_max} Gegenstände gefunden und die letzte Badge erhalten. Herzlichen Glückwunsch!</p>'
else:
issued = ''
return f'''
<h1>{display_name}</h1>
<p>
Hi {username}
</p>
<p>
Du hast <strong>{n}</strong> von {n_max} Gegenstände gefunden!
</p>
{issued}
'''
if __name__ == '__main__':
run(app=app, host='127.0.0.1', port=8080)