erfamap/generate_map.py
s3lph 146775937e
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Solve the Luxembourg Problem
Luxembourg has two geoshapes registered on Wikidata, and the one considered truthy is not the one we want.
Solving this required messing with the SPARQL query as well as filtering to the now non-unique result set.
2022-10-09 01:15:36 +02:00

553 lines
22 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import urllib.request
import urllib.parse
import json
import base64
import sys
import argparse
import shutil
from lxml import etree
import pyproj
from geopy import Nominatim
import tqdm
import cairosvg
from PIL import Image, ImageFont
from PIL.ImageDraw import ImageDraw
USER_AGENT = 'Erfamap/v0.1 (https://git.kabelsalat.ch/s3lph/erfamap)'
class CachePaths:
def __init__(self, path: str):
if os.path.exists(path) and not os.path.isdir(path):
raise AttributeError(f'Path exists but is not a directory: {path}')
os.makedirs(path, exist_ok=True)
self.shapes_states = os.path.join(path, 'shapes_states')
self.shapes_countries = os.path.join(path, 'shapes_countries')
self.shapes_filtered = os.path.join(path, 'shapes_filtered')
self.erfa_info = os.path.join(path, 'erfa-info.json')
self.chaostreff_info = os.path.join(path, 'chaostreff-info.json')
#BOUNDING_BOX = [(4.27775, 46.7482713), (19.2403594, 54.9833021)]
# --bbox 4.27775 46.7482713 19.2403594 54.9833021
ERFA_URL = 'https://doku.ccc.de/Spezial:Semantische_Suche/format%3Djson/limit%3D50/link%3Dall/headers%3Dshow/searchlabel%3DJSON/class%3Dsortable-20wikitable-20smwtable/sort%3D/order%3Dasc/offset%3D0/-5B-5BKategorie:Erfa-2DKreise-5D-5D-20-5B-5BChaostreff-2DActive::wahr-5D-5D/-3FChaostreff-2DCity/-3FChaostreff-2DPhysical-2DAddress/-3FChaostreff-2DPhysical-2DHousenumber/-3FChaostreff-2DPhysical-2DPostcode/-3FChaostreff-2DPhysical-2DCity/-3FChaostreff-2DCountry/mainlabel%3D/prettyprint%3Dtrue/unescape%3Dtrue'
CHAOSTREFF_URL = 'https://doku.ccc.de/Spezial:Semantische_Suche/format%3Djson/limit%3D50/link%3Dall/headers%3Dshow/searchlabel%3DJSON/class%3Dsortable-20wikitable-20smwtable/sort%3D/order%3Dasc/offset%3D0/-5B-5BKategorie:Chaostreffs-5D-5D-20-5B-5BChaostreff-2DActive::wahr-5D-5D/-3FChaostreff-2DCity/-3FChaostreff-2DPhysical-2DAddress/-3FChaostreff-2DPhysical-2DHousenumber/-3FChaostreff-2DPhysical-2DPostcode/-3FChaostreff-2DPhysical-2DCity/-3FChaostreff-2DCountry/mainlabel%3D/prettyprint%3Dtrue/unescape%3Dtrue'
def sparql_query(query):
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': USER_AGENT,
'Accept': 'application/sparql-results+json',
}
body = urllib.parse.urlencode({'query': query}).encode()
req = urllib.request.Request('https://query.wikidata.org/sparql', headers=headers, data=body)
with urllib.request.urlopen(req) as resp:
resultset = json.load(resp)
results = {}
for r in resultset.get('results', {}).get('bindings'):
basename = None
url = None
for k, v in r.items():
if k == 'item':
basename = os.path.basename(v['value'])
elif k == 'map':
url = v['value']
results[basename] = url
return results
def fetch_geoshapes(target, shape_urls):
os.makedirs(target, exist_ok=True)
candidates = {}
keep = {}
for item, url in tqdm.tqdm(shape_urls.items()):
try:
with urllib.request.urlopen(url) as resp:
shape = json.load(resp)
candidates.setdefault(item, []).append(shape)
except urllib.error.HTTPError as e:
print(e)
for item, ican in candidates.items():
# Prefer zoom level 4
keep[item] = min(ican, key=lambda x: abs(4-x.get('zoom', 1000)))
for item, shape in keep.items():
with open(os.path.join(target, item + '.json'), 'w') as f:
json.dump(shape, f)
def fetch_wikidata_states(target):
shape_urls = sparql_query('''
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
SELECT DISTINCT ?item ?map WHERE {
# ?item is instance of federal state of germany and has geoshape ?map
?item wdt:P31 wd:Q1221156;
wdt:P3896 ?map
}
''')
print('Retrieving state border shapes')
fetch_geoshapes(target, shape_urls)
def fetch_wikidata_countries(target):
shape_urls = sparql_query('''
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
PREFIX p: <http://www.wikidata.org/prop/>
PREFIX ps: <http://www.wikidata.org/prop/statement/>
PREFIX wikibase: <http://wikiba.se/ontology#>
SELECT DISTINCT ?item ?map WHERE {
# ?item is instance of sovereign state, transitively part of europe and has geoshape ?map
# ?item is instance of country or sovereign state
?item wdt:P31 ?stateclass.
# ?item is transitively part of Europe (Contintent) or EEA
?item wdt:P361+ ?euroclass.
# ?item has geoshape ?map (including all non-deprecated results)
?item p:P3896 ?st.
?st ps:P3896 ?map;
MINUS { ?st wikibase:rank wikibase:DeprecatedRank }
FILTER (?stateclass = wd:Q6256 || ?stateclass = wd:Q3624078).
FILTER (?euroclass = wd:Q46 || ?euroclass = wd:Q8932).
}
''')
print('Retrieving country border shapes')
fetch_geoshapes(target, shape_urls)
def filter_boundingbox(source, target, bbox):
files = os.listdir(source)
os.makedirs(target, exist_ok=True)
print('Filtering countries outside the bounding box')
for f in tqdm.tqdm(files):
if not f.endswith('.json') or 'Q183.json' in f:
continue
path = os.path.join(source, f)
with open(path, 'r') as sf:
shapedata = sf.read()
shape = json.loads(shapedata)
keep = False
geo = shape['data']['features'][0]['geometry']
if geo['type'] == 'Polygon':
geo['coordinates'] = [geo['coordinates']]
for poly in geo['coordinates']:
for point in poly[0]:
if point[0] >= bbox[0][0] and point[1] >= bbox[0][1] \
and point[0] <= bbox[1][0] and point[1] <= bbox[1][1]:
keep = True
break
if keep:
break
if keep:
with open(os.path.join(target, f), 'w') as sf:
sf.write(shapedata)
def address_lookup(name, erfa):
locator = Nominatim(user_agent=USER_AGENT)
number = erfa['Chaostreff-Physical-Housenumber']
street = erfa['Chaostreff-Physical-Address']
zipcode = erfa['Chaostreff-Physical-Postcode']
acity = erfa['Chaostreff-Physical-City']
city = erfa['Chaostreff-City'][0]
country = erfa['Chaostreff-Country'][0]
formats = [
# Muttenz, Schweiz
f'{city}, {country}'
]
if zipcode and acity:
# 4132 Muttenz, Schweiz
formats.insert(0, f'{zipcode[0]} {acity[0]}, {country}')
if zipcode and acity and number and street:
# Birsfelderstrasse 6, 4132 Muttenz, Schweiz
formats.insert(0, f'{street[0]} {number[0]}, {zipcode[0]} {acity[0]}, {country}')
for fmt in formats:
response = locator.geocode(fmt)
if response is not None:
return response.longitude, response.latitude
print(f'No location found for {name}, tried the following address formats:')
for fmt in formats:
print(f' {fmt}')
return None
def fetch_erfas(target, url):
userpw = os.getenv('DOKU_CCC_DE_BASICAUTH')
if userpw is None:
print('Please set environment variable DOKU_CCC_DE_BASICAUTH=username:password')
exit(1)
auth = base64.b64encode(userpw.encode()).decode()
erfas = {}
req = urllib.request.Request(url, headers={'Authorization': f'Basic {auth}'})
with urllib.request.urlopen(req) as resp:
erfadata = json.loads(resp.read().decode())
print('Looking up addresses')
for name, erfa in tqdm.tqdm(erfadata['results'].items()):
location = address_lookup(name, erfa['printouts'])
if location is None:
print(f'WARNING: No location for {name}')
city = erfa['printouts']['Chaostreff-City'][0]
erfas[city] = location
with open(target, 'w') as f:
json.dump(erfas, f)
def compute_bbox(ns):
if ns.bbox is not None:
return [
(min(ns.bbox[0], ns.bbox[2]), min(ns.bbox[1], ns.bbox[3])),
(max(ns.bbox[0], ns.bbox[2]), max(ns.bbox[1], ns.bbox[3]))
]
print('Computing map bounding box')
bounds = []
for path in tqdm.tqdm([ns.cache_directory.erfa_info, ns.cache_directory.chaostreff_info]):
with open(path, 'r') as f:
erfadata = json.load(f)
for lon, lat in erfadata.values():
if len(bounds) == 0:
bounds.append(lon)
bounds.append(lat)
bounds.append(lon)
bounds.append(lat)
else:
bounds[0] = min(bounds[0], lon)
bounds[1] = min(bounds[1], lat)
bounds[2] = max(bounds[2], lon)
bounds[3] = max(bounds[3], lat)
return [
(bounds[0] - ns.bbox_margin, bounds[1] - ns.bbox_margin),
(bounds[2] + ns.bbox_margin, bounds[3] + ns.bbox_margin)
]
class BoundingBox:
def __init__(self, left, top, right, bottom):
self.left = left
self.top = top
self.right = right
self.bottom = bottom
def __contains__(self, other):
if isinstance(other, BoundingBox):
if other.right < self.left or other.left > self.right:
return False
if other.bottom < self.top or other.top > self.bottom:
return False
return True
elif isinstance(other, tuple) or isinstance(other, list):
if len(other) != 2:
raise TypeError()
if self.left > other[0] or self.right < other[0]:
return False
if self.top > other[1] or self.bottom < other[1]:
return False
return True
raise TypeError()
@property
def width(self):
return self.right - self.left
@property
def height(self):
return self.bottom - self.top
def with_margin(self, margin):
return BoundingBox(self.left - margin, self.top - margin, self.right + margin, self.bottom + margin)
def optimize_text_layout(ns, erfas, chaostreffs, size, svg):
font = ImageFont.truetype(ns.font, ns.font_size)
pil = ImageDraw(Image.new('P', size))
lboxes = {}
rboxes = {}
for city, location in erfas.items():
lbox = pil.textbbox((location[0] + ns.font_distance, location[1] - ns.font_size/2), city, font=font, anchor='lt')
lbox = BoundingBox(*lbox)
rbox = pil.textbbox((location[0] - ns.font_distance, location[1] - ns.font_size/2), city, font=font, anchor='rt')
rbox = BoundingBox(*rbox)
lboxes[city] = lbox
rboxes[city] = rbox
if ns.debug:
dr1 = etree.Element('rect', x=str(lbox.left), y=str(lbox.top), width=str(lbox.width), height=str(lbox.height))
dr1.set('class', 'debugleft')
dr2 = etree.Element('rect', x=str(rbox.left), y=str(rbox.top), width=str(rbox.width), height=str(rbox.height))
dr2.set('class', 'debugright')
svg.append(dr1)
svg.append(dr2)
unfinished = {c for c in erfas.keys()}
finished = {}
for city in list(unfinished):
margin_left = lboxes[city].with_margin(ns.dotsize_erfa)
i_left = sum([1 for c, b in lboxes.items() if c != city and b in margin_left])
i_left += sum([1 for c, b in rboxes.items() if c != city and b in margin_left])
i_left += sum([0.5 for c, loc in chaostreffs.items() if loc in margin_left])
margin_right = rboxes[city].with_margin(ns.dotsize_erfa)
i_right = sum([1 for c, b in lboxes.items() if c != city and b in margin_right])
i_right += sum([1 for c, b in rboxes.items() if c != city and b in margin_right])
i_right += sum([0.5 for c, loc in chaostreffs.items() if loc in margin_right])
if i_right == 0 and i_left > 0:
finished[city] = rboxes[city]
unfinished.discard(city)
elif i_left == 0:
if lboxes[city].left > size[0] * 0.8:
finished[city] = rboxes[city]
else:
finished[city] = lboxes[city]
unfinished.discard(city)
for a in list(unfinished):
if a not in unfinished:
continue
mla = lboxes[a].with_margin(ns.dotsize_erfa)
mra = rboxes[a].with_margin(ns.dotsize_erfa)
for b in list(unfinished.difference({a})):
if b not in unfinished:
continue
mlb = lboxes[b]
mrb = rboxes[b]
if mla not in mlb and mla not in mlb and mra not in mlb and mra not in mrb:
continue
if mra in mlb:
finished[a] = lboxes[a]
finished[b] = rboxes[b]
unfinished.discard(a)
unfinished.discard(b)
elif mla in mrb:
finished[a] = rboxes[a]
finished[b] = lboxes[b]
unfinished.discard(a)
unfinished.discard(b)
for city in list(unfinished):
margin_left = lboxes[city].with_margin(ns.dotsize_erfa)
margin_right = rboxes[city].with_margin(ns.dotsize_erfa)
i_left = sum([1 for c, b in finished.items() if c != city and b in margin_left])
i_left += sum([0.5 for c, loc in chaostreffs.items() if loc in margin_left])
i_right = sum([1 for c, b in finished.items() if c != city and b in margin_right])
i_right += sum([0.5 for c, loc in chaostreffs.items() if loc in margin_right])
if i_left <= i_right:
finished[city] = lboxes[city]
else:
finished[city] = rboxes[city]
unfinished.discard(city)
return finished
def create_svg(ns, bbox):
print('Creating SVG image')
# Convert from WGS84 lon, lat to chosen projection
transformer = pyproj.Transformer.from_crs('epsg:4326', ns.projection)
scalex = ns.scale_x
scaley = ns.scale_y
blt = transformer.transform(*bbox[0])
trt = transformer.transform(*bbox[1])
trans_bounding_box = [
(scalex*blt[0], scaley*trt[1]),
(scalex*trt[0], scaley*blt[1])
]
origin = trans_bounding_box[0]
svg_box = (trans_bounding_box[1][0] - origin[0], origin[1] - trans_bounding_box[1][1])
shapes_states = []
files = os.listdir(ns.cache_directory.shapes_states)
for f in files:
if not f.endswith('.json'):
continue
path = os.path.join(ns.cache_directory.shapes_states, f)
with open(path, 'r') as sf:
shapedata = sf.read()
shape = json.loads(shapedata)
geo = shape['data']['features'][0]['geometry']
if geo['type'] == 'Polygon':
geo['coordinates'] = [geo['coordinates']]
for poly in geo['coordinates']:
ts = []
for x, y in poly[0]:
xt, yt = transformer.transform(x, y)
ts.append((xt*scalex - origin[0], origin[1] - yt*scaley))
shapes_states.append(ts)
shapes_countries = []
files = os.listdir(ns.cache_directory.shapes_filtered)
for f in files:
if not f.endswith('.json'):
continue
path = os.path.join(ns.cache_directory.shapes_filtered, f)
with open(path, 'r') as sf:
shapedata = sf.read()
shape = json.loads(shapedata)
geo = shape['data']['features'][0]['geometry']
if geo['type'] == 'Polygon':
geo['coordinates'] = [geo['coordinates']]
for poly in geo['coordinates']:
ts = []
for x, y in poly[0]:
xt, yt = transformer.transform(x, y)
ts.append((xt*scalex - origin[0], origin[1] - yt*scaley))
shapes_countries.append(ts)
chaostreffs = {}
with open(ns.cache_directory.chaostreff_info, 'r') as f:
ctdata = json.load(f)
for city, location in ctdata.items():
if location is None:
continue
xt, yt = transformer.transform(*location)
chaostreffs[city] = (xt*scalex - origin[0], origin[1] - yt*scaley)
erfas = {}
with open(ns.cache_directory.erfa_info, 'r') as f:
ctdata = json.load(f)
for city, location in ctdata.items():
if location is None:
continue
xt, yt = transformer.transform(*location)
erfas[city] = (xt*scalex - origin[0], origin[1] - yt*scaley)
rectbox = [0, 0, svg_box[0], svg_box[1]]
for shape in shapes_states + shapes_countries:
for lon, lat in shape:
rectbox[0] = min(lon, rectbox[0])
rectbox[1] = min(lat, rectbox[1])
rectbox[2] = max(lon, rectbox[2])
rectbox[3] = max(lat, rectbox[3])
svg = etree.Element('svg',
xmlns='http://www.w3.org/2000/svg',
viewBox=f'0 0 {svg_box[0]} {svg_box[1]}',
width=str(svg_box[0]), height=str(svg_box[1]))
defs = etree.Element('defs')
style = etree.Element('style', type='text/css')
style.text = f'@import url({ns.stylesheet})'
defs.append(style)
svg.append(defs)
bg = etree.Element('rect',
x=str(rectbox[0]),
y=str(rectbox[1]),
width=str(rectbox[3]-rectbox[1]),
height=str(rectbox[2]-rectbox[0]))
bg.set('class', 'background')
svg.append(bg)
for shape in shapes_countries:
points = ' '.join([f'{lon},{lat}' for lon, lat in shape])
poly = etree.Element('polygon', points=points)
poly.set('class', 'country')
svg.append(poly)
# Render shortest shapes last s.t. Berlin, Hamburg and Bremen are rendered on top of their surrounding states
for shape in sorted(shapes_states, key=lambda x: -sum(len(s) for s in x)):
points = ' '.join([f'{lon},{lat}' for lon, lat in shape])
poly = etree.Element('polygon', points=points)
poly.set('class', 'state')
svg.append(poly)
for city, location in erfas.items():
circle = etree.Element('circle', cx=str(location[0]), cy=str(location[1]), r=str(ns.dotsize_erfa))
circle.set('class', 'erfa')
svg.append(circle)
for city, location in chaostreffs.items():
circle = etree.Element('circle', cx=str(location[0]), cy=str(location[1]), r=str(ns.dotsize_treff))
circle.set('class', 'chaostreff')
svg.append(circle)
print('Layouting labels')
texts = optimize_text_layout(ns, erfas, chaostreffs, (int(svg_box[0]), int(svg_box[1])), svg)
for city, box in texts.items():
text = etree.Element('text', x=str(box.left), y=str(box.top))
text.set('alignment-baseline', 'hanging')
text.set('class', 'erfalabel')
text.text = city
svg.append(text)
print('Done, writing SVG')
with open('map.svg', 'wb') as mapfile:
root = etree.ElementTree(svg)
root.write(mapfile)
print('Done, writing PNG')
cairosvg.svg2png(url='map.svg', write_to='map.png')
print('Done')
def main():
ap = argparse.ArgumentParser(sys.argv[0])
ap.add_argument('--cache-directory', type=CachePaths, default='cache', help='Path to the cache directory')
ap.add_argument('--update-borders', action='store_true', default=False, help='Update country borders from Wikidata')
ap.add_argument('--update-erfalist', action='store_true', default=False, help='Update Erfa/Chaostreff list from doku.ccc.de')
ap.add_argument('--bbox', type=float, nargs=4, default=None, help='Override map bounding box with <lon> <lat> <lon> <lat>')
ap.add_argument('--bbox-margin', type=float, default=0.3, help='Margin around the inferred bounding box. Ignored with --bbox')
ap.add_argument('--stylesheet', type=str, default='style/erfamap.css', help='Stylesheet for the generated SVG')
ap.add_argument('--font', type=str, default='style/concertone-regular.ttf', help='Name of the font used in the stylesheet.')
ap.add_argument('--font-size', type=int, default=30, help='Size of the font used in the stylesheet.')
ap.add_argument('--font-distance', type=int, default=25, help='Distance of labels from their dots center')
ap.add_argument('--dotsize-erfa', type=float, default=13, help='Radius of Erfa dots')
ap.add_argument('--dotsize-treff', type=float, default=8, help='Radius of Chaostreff dots')
ap.add_argument('--projection', type=str, default='epsg:4258', help='Map projection to convert the WGS84 coordinates to')
ap.add_argument('--scale-x', type=float, default=130, help='X axis scale to apply after projecting')
ap.add_argument('--scale-y', type=float, default=200, help='Y axis scale to apply after projecting')
ap.add_argument('--debug', action='store_true', default=False, help='Add debug information to the produced SVG')
ns = ap.parse_args(sys.argv[1:])
if ns.update_borders or not os.path.isdir(ns.cache_directory.shapes_countries):
if os.path.isdir(ns.cache_directory.shapes_countries):
shutil.rmtree(ns.cache_directory.shapes_countries)
fetch_wikidata_countries(target=ns.cache_directory.shapes_countries)
if ns.update_borders or not os.path.isdir(ns.cache_directory.shapes_states):
if os.path.isdir(ns.cache_directory.shapes_states):
shutil.rmtree(ns.cache_directory.shapes_states)
fetch_wikidata_states(target=ns.cache_directory.shapes_states)
if ns.update_erfalist or not os.path.isfile(ns.cache_directory.erfa_info):
if os.path.exists(ns.cache_directory.erfa_info):
os.unlink(ns.cache_directory.erfa_info)
fetch_erfas(target=ns.cache_directory.erfa_info, url=ERFA_URL)
if ns.update_erfalist or not os.path.isfile(ns.cache_directory.chaostreff_info):
if os.path.exists(ns.cache_directory.chaostreff_info):
os.unlink(ns.cache_directory.chaostreff_info)
fetch_erfas(target=ns.cache_directory.chaostreff_info, url=CHAOSTREFF_URL)
bbox = compute_bbox(ns)
filter_boundingbox(ns.cache_directory.shapes_countries, ns.cache_directory.shapes_filtered, bbox)
create_svg(ns, bbox)
if __name__ == '__main__':
main()