#!/usr/bin/env python3 # # https://git.kabelsalat.ch/s3lph/erfamap import os import urllib.request import urllib.parse import json import base64 import sys import argparse import shutil import random from lxml import etree import pyproj from geopy import Nominatim import tqdm import cairosvg from PIL import Image, ImageFont from PIL.ImageDraw import ImageDraw USER_AGENT = 'Erfamap/v0.1 (https://git.kabelsalat.ch/s3lph/erfamap)' class CachePaths: def __init__(self, path: str): if os.path.exists(path) and not os.path.isdir(path): raise AttributeError(f'Path exists but is not a directory: {path}') os.makedirs(path, exist_ok=True) self.shapes_states = os.path.join(path, 'shapes_states') self.shapes_countries = os.path.join(path, 'shapes_countries') self.shapes_filtered = os.path.join(path, 'shapes_filtered') self.erfa_info = os.path.join(path, 'erfa-info.json') self.chaostreff_info = os.path.join(path, 'chaostreff-info.json') ERFA_URL = 'https://doku.ccc.de/Spezial:Semantische_Suche/format%3Djson/limit%3D50/link%3Dall/headers%3Dshow/searchlabel%3DJSON/class%3Dsortable-20wikitable-20smwtable/sort%3D/order%3Dasc/offset%3D0/-5B-5BKategorie:Erfa-2DKreise-5D-5D-20-5B-5BChaostreff-2DActive::wahr-5D-5D/-3FChaostreff-2DCity/-3FChaostreff-2DPhysical-2DAddress/-3FChaostreff-2DPhysical-2DHousenumber/-3FChaostreff-2DPhysical-2DPostcode/-3FChaostreff-2DPhysical-2DCity/-3FChaostreff-2DCountry/mainlabel%3D/prettyprint%3Dtrue/unescape%3Dtrue' CHAOSTREFF_URL = 'https://doku.ccc.de/Spezial:Semantische_Suche/format%3Djson/limit%3D50/link%3Dall/headers%3Dshow/searchlabel%3DJSON/class%3Dsortable-20wikitable-20smwtable/sort%3D/order%3Dasc/offset%3D0/-5B-5BKategorie:Chaostreffs-5D-5D-20-5B-5BChaostreff-2DActive::wahr-5D-5D/-3FChaostreff-2DCity/-3FChaostreff-2DPhysical-2DAddress/-3FChaostreff-2DPhysical-2DHousenumber/-3FChaostreff-2DPhysical-2DPostcode/-3FChaostreff-2DPhysical-2DCity/-3FChaostreff-2DCountry/mainlabel%3D/prettyprint%3Dtrue/unescape%3Dtrue' def sparql_query(query): headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'User-Agent': USER_AGENT, 'Accept': 'application/sparql-results+json', } body = urllib.parse.urlencode({'query': query}).encode() req = urllib.request.Request('https://query.wikidata.org/sparql', headers=headers, data=body) with urllib.request.urlopen(req) as resp: resultset = json.load(resp) results = {} for r in resultset.get('results', {}).get('bindings'): basename = None url = None for k, v in r.items(): if k == 'item': basename = os.path.basename(v['value']) elif k == 'map': url = v['value'] results[basename] = url return results def fetch_geoshapes(target, shape_urls): os.makedirs(target, exist_ok=True) candidates = {} keep = {} for item, url in tqdm.tqdm(shape_urls.items()): try: with urllib.request.urlopen(url) as resp: shape = json.load(resp) if not shape.get('license', 'proprietary').startswith('CC0-'): # Only include public domain data continue candidates.setdefault(item, []).append(shape) except urllib.error.HTTPError as e: print(e) for item, ican in candidates.items(): # Prefer zoom level 4 keep[item] = min(ican, key=lambda x: abs(4-x.get('zoom', 1000))) for item, shape in keep.items(): with open(os.path.join(target, item + '.json'), 'w') as f: json.dump(shape, f) def fetch_wikidata_states(target): shape_urls = sparql_query(''' PREFIX wd: PREFIX wdt: SELECT DISTINCT ?item ?map WHERE { # ?item is instance of federal state of germany and has geoshape ?map ?item wdt:P31 wd:Q1221156; wdt:P3896 ?map } ''') print('Retrieving state border shapes') fetch_geoshapes(target, shape_urls) def fetch_wikidata_countries(target): shape_urls = sparql_query(''' PREFIX wd: PREFIX wdt: PREFIX p: PREFIX ps: PREFIX wikibase: SELECT DISTINCT ?item ?map WHERE { # ?item is instance of sovereign state, transitively part of europe and has geoshape ?map # ?item is instance of country or sovereign state ?item wdt:P31 ?stateclass. # ?item is transitively part of Europe (Contintent) or EEA ?item wdt:P361+ ?euroclass. # ?item has geoshape ?map (including all non-deprecated results) ?item p:P3896 ?st. ?st ps:P3896 ?map; MINUS { ?st wikibase:rank wikibase:DeprecatedRank } FILTER (?stateclass = wd:Q6256 || ?stateclass = wd:Q3624078). FILTER (?euroclass = wd:Q46 || ?euroclass = wd:Q8932). } ''') print('Retrieving country border shapes') fetch_geoshapes(target, shape_urls) def filter_boundingbox(source, target, bbox): files = os.listdir(source) os.makedirs(target, exist_ok=True) print('Filtering countries outside the bounding box') for f in tqdm.tqdm(files): if not f.endswith('.json') or 'Q183.json' in f: continue path = os.path.join(source, f) with open(path, 'r') as sf: shapedata = sf.read() shape = json.loads(shapedata) keep = False geo = shape['data']['features'][0]['geometry'] if geo['type'] == 'Polygon': geo['coordinates'] = [geo['coordinates']] for poly in geo['coordinates']: for point in poly[0]: if point[0] >= bbox[0][0] and point[1] >= bbox[0][1] \ and point[0] <= bbox[1][0] and point[1] <= bbox[1][1]: keep = True break if keep: break if keep: with open(os.path.join(target, f), 'w') as sf: sf.write(shapedata) def address_lookup(name, erfa): locator = Nominatim(user_agent=USER_AGENT) number = erfa['Chaostreff-Physical-Housenumber'] street = erfa['Chaostreff-Physical-Address'] zipcode = erfa['Chaostreff-Physical-Postcode'] acity = erfa['Chaostreff-Physical-City'] city = erfa['Chaostreff-City'][0] country = erfa['Chaostreff-Country'][0] formats = [ # Muttenz, Schweiz f'{city}, {country}' ] if zipcode and acity: # 4132 Muttenz, Schweiz formats.insert(0, f'{zipcode[0]} {acity[0]}, {country}') if zipcode and acity and number and street: # Birsfelderstrasse 6, 4132 Muttenz, Schweiz formats.insert(0, f'{street[0]} {number[0]}, {zipcode[0]} {acity[0]}, {country}') for fmt in formats: response = locator.geocode(fmt) if response is not None: return response.longitude, response.latitude print(f'No location found for {name}, tried the following address formats:') for fmt in formats: print(f' {fmt}') return None def fetch_erfas(target, url): userpw = os.getenv('DOKU_CCC_DE_BASICAUTH') if userpw is None: print('Please set environment variable DOKU_CCC_DE_BASICAUTH=username:password') exit(1) auth = base64.b64encode(userpw.encode()).decode() erfas = {} req = urllib.request.Request(url, headers={'Authorization': f'Basic {auth}'}) with urllib.request.urlopen(req) as resp: erfadata = json.loads(resp.read().decode()) print('Looking up addresses') for name, erfa in tqdm.tqdm(erfadata['results'].items()): location = address_lookup(name, erfa['printouts']) if location is None: print(f'WARNING: No location for {name}') city = erfa['printouts']['Chaostreff-City'][0] erfas[city] = location with open(target, 'w') as f: json.dump(erfas, f) def compute_bbox(ns): if ns.bbox is not None: return [ (min(ns.bbox[0], ns.bbox[2]), min(ns.bbox[1], ns.bbox[3])), (max(ns.bbox[0], ns.bbox[2]), max(ns.bbox[1], ns.bbox[3])) ] print('Computing map bounding box') bounds = [] for path in tqdm.tqdm([ns.cache_directory.erfa_info, ns.cache_directory.chaostreff_info]): with open(path, 'r') as f: erfadata = json.load(f) for lon, lat in erfadata.values(): if len(bounds) == 0: bounds.append(lon) bounds.append(lat) bounds.append(lon) bounds.append(lat) else: bounds[0] = min(bounds[0], lon) bounds[1] = min(bounds[1], lat) bounds[2] = max(bounds[2], lon) bounds[3] = max(bounds[3], lat) return [ (bounds[0] - ns.bbox_margin, bounds[1] - ns.bbox_margin), (bounds[2] + ns.bbox_margin, bounds[3] + ns.bbox_margin) ] class BoundingBox: def __init__(self, left, top, right, bottom, meta=None, base_weight=0): self.left = left self.top = top self.right = right self.bottom = bottom self.meta = meta self.base_weight = base_weight self.finished = False def __contains__(self, other): if isinstance(other, BoundingBox): if other.right < self.left or other.left > self.right: return False if other.bottom < self.top or other.top > self.bottom: return False return True elif isinstance(other, tuple) or isinstance(other, list): if len(other) != 2: raise TypeError() if self.left > other[0] or self.right < other[0]: return False if self.top > other[1] or self.bottom < other[1]: return False return True raise TypeError() def __add__(self, other): if not isinstance(other, tuple) and not isinstance(other, list): raise TypeError() return BoundingBox(self.left + other[0], self.top + other[1], self.right + other[0], self.bottom + other[1], self.meta, self.base_weight) def __iadd__(self, other): if not isinstance(other, tuple) and not isinstance(other, list): raise TypeError() self.left += other[0] self.top += other[1] self.right += other[0] self.bottom += other[1] return self def __sub__(self, other): if not isinstance(other, tuple) and not isinstance(other, list): raise TypeError() return BoundingBox(self.left - other[0], self.top - other[1], self.right - other[0], self.bottom - other[1], self.meta, self.base_weight) def __isub__(self, other): if not isinstance(other, tuple) and not isinstance(other, list): raise TypeError() self.left -= other[0] self.top -= other[1] self.right -= other[0] self.bottom -= other[1] self._weight = None return self @property def width(self): return self.right - self.left @property def height(self): return self.bottom - self.top @property def center(self): return (self.left + self.width/2, self.top + self.height/2) def distance2(self, other): c1 = self.center c2 = other.center return (c1[0] - c2[0])**2 + (c1[1] - c2[1])**2 def with_margin(self, margin): return BoundingBox(self.left - margin, self.top - margin, self.right + margin, self.bottom + margin, self.meta, self.base_weight) def compute_weight(self, other, erfas, chaostreffs, ns): w = 0 swm = self.with_margin(ns.font_distance) swe = self.with_margin(ns.dotsize_erfa) swc = self.with_margin(ns.dotsize_treff) swme = swm.with_margin(ns.dotsize_erfa) swmc = swm.with_margin(ns.dotsize_treff) for o in other: if o.meta['city'] == self.meta['city']: continue if o in self: if o.finished: w += 50 else: w += 3 if o in swm: if o.finished: w += 3 else: w += 1 for city, location in erfas.items(): if city == self.meta['city']: continue if location in swe: w += 15 if location in swme: w += 2 for city, location in chaostreffs.items(): if city == self.meta['city']: continue if location in swc: w += 5 if location in swmc: w += 1 self._weight = w def compute_finalized_weight(self, other, erfas, chaostreffs, ns): w = 0 swm = self.with_margin(ns.font_distance) swe = self.with_margin(ns.dotsize_erfa) swc = self.with_margin(ns.dotsize_treff) swme = swm.with_margin(ns.dotsize_erfa) swmc = swm.with_margin(ns.dotsize_treff) for o in other: if o.meta['city'] == self.meta['city']: continue if o in self: w += 100 if o in swm: w += 15 for city, location in erfas.items(): if city == self.meta['city']: continue if location in swe: w += 100 if location in swme: w += 10 for city, location in chaostreffs.items(): if city == self.meta['city']: continue if location in swc: w += 100 if location in swmc: w += 5 self._weight = w @property def weight(self): return self._weight + self.base_weight @property def is_optimal(self): return self._weight == 0 def optimize_text_layout(ns, erfas, chaostreffs, size, svg): font = ImageFont.truetype(ns.font, ns.font_size) pil = ImageDraw(Image.new('P', size)) candidates = {} for city, location in erfas.items(): text = city for rfrom, to in ns.rename: if rfrom == city: text = to break rbox = pil.textbbox((location[0] + ns.font_distance, location[1] - ns.font_size/2), text, font=font, anchor='lt') rbox = BoundingBox(*rbox, meta={'city': city, 'side': 'r', 'text': text}) rbox.right += 1.5 * ns.font_distance lbox = pil.textbbox((location[0] - ns.font_distance, location[1] - ns.font_size/2), text, font=font, anchor='rt') lbox = BoundingBox(*lbox, meta={'city': city, 'side': 'l', 'text': text}) lbox.left -= 1.5 * ns.font_distance if location[0] > 0.8 * size[0]: rbox.base_weight += 0.001 else: lbox.base_weight += 0.001 candidates[city] = [lbox, lbox + (0, ns.dotsize_erfa), lbox + (0, ns.dotsize_erfa*2), lbox - (0, ns.dotsize_erfa), lbox - (0, ns.dotsize_erfa*2), rbox, rbox + (0, ns.dotsize_erfa), rbox + (0, ns.dotsize_erfa*2), rbox - (0, ns.dotsize_erfa), rbox - (0, ns.dotsize_erfa*2)] rbox.base_weight -= 0.003 lbox.base_weight -= 0.003 if ns.debug: for c in candidates[city]: dr = etree.Element('rect', x=str(c.left), y=str(c.top), width=str(c.width), height=str(c.height)) dr.set('class', 'debugleft') svg.append(dr) unfinished = {c for c in erfas.keys()} finished = {} while len(unfinished) > 0: all_boxes = set(finished.values()) unfinished_boxes = set() for city in unfinished: all_boxes.update(candidates[city]) unfinished_boxes.update(candidates[city]) for box in all_boxes: box.compute_weight(all_boxes, erfas, chaostreffs, ns) # Get candidate with the least number of optimal solutions > 0 optcity = min(unfinished, key=lambda city: len([1 for box in candidates[city] if box.is_optimal]) or float('inf')) optboxes = [box for box in candidates[optcity] if box.is_optimal] if len(optboxes) > 0: finished[optcity] = min(optboxes, key=lambda box: box.weight) finished[optcity].finished = True unfinished.discard(optcity) continue # If no candidate with at least one optimal solution is left, go by global minimum minbox = min(unfinished_boxes, key=lambda box: box.weight) mincity = minbox.meta['city'] finished[mincity] = minbox minbox.finished = True unfinished.discard(mincity) # Iteratively improve the layout for i in tqdm.tqdm(range(len(finished))): changed = False order = list(finished.keys()) random.shuffle(order) for city in order: all_boxes = set(finished.values()) for box in candidates[city]: box.compute_finalized_weight(all_boxes, erfas, chaostreffs, ns) minbox = min(candidates[city], key=lambda box: box.weight) if minbox is not finished[city]: changed = True finished[city] = minbox minbox.finished = True if not changed: # Nothing changed in this iteration - no need to retry break return finished def create_svg(ns, bbox): print('Creating SVG image') # Convert from WGS84 lon, lat to chosen projection transformer = pyproj.Transformer.from_crs('epsg:4326', ns.projection) scalex = ns.scale_x scaley = ns.scale_y blt = transformer.transform(*bbox[0]) trt = transformer.transform(*bbox[1]) trans_bounding_box = [ (scalex*blt[0], scaley*trt[1]), (scalex*trt[0], scaley*blt[1]) ] origin = trans_bounding_box[0] svg_box = (trans_bounding_box[1][0] - origin[0], origin[1] - trans_bounding_box[1][1]) shapes_states = [] files = os.listdir(ns.cache_directory.shapes_states) for f in files: if not f.endswith('.json'): continue path = os.path.join(ns.cache_directory.shapes_states, f) with open(path, 'r') as sf: shapedata = sf.read() shape = json.loads(shapedata) name = shape['description']['en'] geo = shape['data']['features'][0]['geometry'] if geo['type'] == 'Polygon': geo['coordinates'] = [geo['coordinates']] for poly in geo['coordinates']: ts = [] for x, y in poly[0]: xt, yt = transformer.transform(x, y) ts.append((xt*scalex - origin[0], origin[1] - yt*scaley)) shapes_states.append((name, ts)) shapes_countries = [] files = os.listdir(ns.cache_directory.shapes_filtered) for f in files: if not f.endswith('.json'): continue path = os.path.join(ns.cache_directory.shapes_filtered, f) with open(path, 'r') as sf: shapedata = sf.read() shape = json.loads(shapedata) name = shape['description']['en'] geo = shape['data']['features'][0]['geometry'] if geo['type'] == 'Polygon': geo['coordinates'] = [geo['coordinates']] for poly in geo['coordinates']: ts = [] for x, y in poly[0]: xt, yt = transformer.transform(x, y) ts.append((xt*scalex - origin[0], origin[1] - yt*scaley)) shapes_countries.append((name, ts)) chaostreffs = {} with open(ns.cache_directory.chaostreff_info, 'r') as f: ctdata = json.load(f) for city, location in ctdata.items(): if location is None: continue xt, yt = transformer.transform(*location) chaostreffs[city] = (xt*scalex - origin[0], origin[1] - yt*scaley) erfas = {} with open(ns.cache_directory.erfa_info, 'r') as f: ctdata = json.load(f) for city, location in ctdata.items(): if location is None: continue xt, yt = transformer.transform(*location) erfas[city] = (xt*scalex - origin[0], origin[1] - yt*scaley) rectbox = [0, 0, svg_box[0], svg_box[1]] for name, shape in shapes_states + shapes_countries: for lon, lat in shape: rectbox[0] = min(lon, rectbox[0]) rectbox[1] = min(lat, rectbox[1]) rectbox[2] = max(lon, rectbox[2]) rectbox[3] = max(lat, rectbox[3]) svg = etree.Element('svg', xmlns='http://www.w3.org/2000/svg', viewBox=f'0 0 {svg_box[0]} {svg_box[1]}', width=str(svg_box[0]), height=str(svg_box[1])) defs = etree.Element('defs') style = etree.Element('style', type='text/css') style.text = f'@import url({ns.stylesheet})' defs.append(style) svg.append(defs) bg = etree.Element('rect', id='background', x=str(rectbox[0]), y=str(rectbox[1]), width=str(rectbox[3]-rectbox[1]), height=str(rectbox[2]-rectbox[0])) bg.set('class', 'background') svg.append(bg) for name, shape in shapes_countries: points = ' '.join([f'{lon},{lat}' for lon, lat in shape]) poly = etree.Element('polygon', points=points) poly.set('class', 'country') poly.set('data-country', name) svg.append(poly) # Render shortest shapes last s.t. Berlin, Hamburg and Bremen are rendered on top of their surrounding states for name, shape in sorted(shapes_states, key=lambda x: -sum(len(s) for s in x[1])): points = ' '.join([f'{lon},{lat}' for lon, lat in shape]) poly = etree.Element('polygon', points=points) poly.set('class', 'state') poly.set('data-state', name) svg.append(poly) for city, location in erfas.items(): circle = etree.Element('circle', cx=str(location[0]), cy=str(location[1]), r=str(ns.dotsize_erfa)) circle.set('class', 'erfa') circle.set('data-erfa', city) svg.append(circle) for city, location in chaostreffs.items(): circle = etree.Element('circle', cx=str(location[0]), cy=str(location[1]), r=str(ns.dotsize_treff)) circle.set('class', 'chaostreff') circle.set('data-chaostreff', city) svg.append(circle) print('Layouting labels') texts = optimize_text_layout(ns, erfas, chaostreffs, (int(svg_box[0]), int(svg_box[1])), svg) for city, box in texts.items(): if box.meta['side'] == 'l': box += (1.5 * ns.font_distance, 0) text = etree.Element('text', x=str(box.left), y=str(box.top)) text.set('alignment-baseline', 'hanging') text.set('class', 'erfalabel') text.set('data-erfa', city) text.text = box.meta['text'] svg.append(text) print('Done, writing SVG') with open('map.svg', 'wb') as mapfile: root = etree.ElementTree(svg) root.write(mapfile) print('Done, writing PNG') cairosvg.svg2png(url='map.svg', write_to='map.png') print('Done') def main(): ap = argparse.ArgumentParser(sys.argv[0]) ap.add_argument('--cache-directory', type=CachePaths, default='cache', help='Path to the cache directory') ap.add_argument('--update-borders', action='store_true', default=False, help='Update country borders from Wikidata') ap.add_argument('--update-erfalist', action='store_true', default=False, help='Update Erfa/Chaostreff list from doku.ccc.de') ap.add_argument('--bbox', type=float, nargs=4, metavar=('LON', 'LAT', 'LON', 'LAT') ,default=None, help='Override map bounding box') ap.add_argument('--bbox-margin', type=float, default=0.3, help='Margin around the inferred bounding box. Ignored with --bbox') ap.add_argument('--stylesheet', type=str, default='style/erfamap.css', help='Stylesheet for the generated SVG') ap.add_argument('--font', type=str, default='style/concertone-regular.ttf', help='Name of the font used in the stylesheet.') ap.add_argument('--font-size', type=int, default=40, help='Size of the font used in the stylesheet.') ap.add_argument('--font-distance', type=int, default=15, help='Distance of labels from their dots center') ap.add_argument('--dotsize-erfa', type=float, default=13, help='Radius of Erfa dots') ap.add_argument('--dotsize-treff', type=float, default=8, help='Radius of Chaostreff dots') ap.add_argument('--rename', type=str, action='append', nargs=2, metavar=('FROM', 'TO'), default=[], help='Rename a city with an overly long name (e.g. "Rothenburg ob der Tauber" to "Rothenburg")') ap.add_argument('--projection', type=str, default='epsg:4258', help='Map projection to convert the WGS84 coordinates to') ap.add_argument('--scale-x', type=float, default=130, help='X axis scale to apply after projecting') ap.add_argument('--scale-y', type=float, default=200, help='Y axis scale to apply after projecting') ap.add_argument('--debug', action='store_true', default=False, help='Add debug information to the produced SVG') ns = ap.parse_args(sys.argv[1:]) if ns.update_borders or not os.path.isdir(ns.cache_directory.shapes_countries): if os.path.isdir(ns.cache_directory.shapes_countries): shutil.rmtree(ns.cache_directory.shapes_countries) fetch_wikidata_countries(target=ns.cache_directory.shapes_countries) if ns.update_borders or not os.path.isdir(ns.cache_directory.shapes_states): if os.path.isdir(ns.cache_directory.shapes_states): shutil.rmtree(ns.cache_directory.shapes_states) fetch_wikidata_states(target=ns.cache_directory.shapes_states) if ns.update_erfalist or not os.path.isfile(ns.cache_directory.erfa_info): if os.path.exists(ns.cache_directory.erfa_info): os.unlink(ns.cache_directory.erfa_info) fetch_erfas(target=ns.cache_directory.erfa_info, url=ERFA_URL) if ns.update_erfalist or not os.path.isfile(ns.cache_directory.chaostreff_info): if os.path.exists(ns.cache_directory.chaostreff_info): os.unlink(ns.cache_directory.chaostreff_info) fetch_erfas(target=ns.cache_directory.chaostreff_info, url=CHAOSTREFF_URL) bbox = compute_bbox(ns) filter_boundingbox(ns.cache_directory.shapes_countries, ns.cache_directory.shapes_filtered, bbox) create_svg(ns, bbox) if __name__ == '__main__': main()