930 lines
36 KiB
Python
Executable file
930 lines
36 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
#
|
|
# https://git.kabelsalat.ch/s3lph/erfamap
|
|
|
|
import abc
|
|
import os
|
|
import urllib.request
|
|
import urllib.parse
|
|
import json
|
|
import base64
|
|
import sys
|
|
import argparse
|
|
import shutil
|
|
import random
|
|
import math
|
|
|
|
from lxml import etree
|
|
import pyproj
|
|
from geopy import Nominatim
|
|
import tqdm
|
|
import cairosvg
|
|
from PIL import Image, ImageFont
|
|
from PIL.ImageDraw import ImageDraw
|
|
|
|
|
|
USER_AGENT = 'Erfamap/v0.1 (https://git.kabelsalat.ch/s3lph/erfamap)'
|
|
|
|
|
|
class CachePaths:
|
|
|
|
def __init__(self, path: str):
|
|
if os.path.exists(path) and not os.path.isdir(path):
|
|
raise AttributeError(f'Path exists but is not a directory: {path}')
|
|
os.makedirs(path, exist_ok=True)
|
|
self.shapes_states = os.path.join(path, 'shapes_states')
|
|
self.shapes_countries = os.path.join(path, 'shapes_countries')
|
|
self.shapes_filtered = os.path.join(path, 'shapes_filtered')
|
|
self.erfa_info = os.path.join(path, 'erfa-info.json')
|
|
self.chaostreff_info = os.path.join(path, 'chaostreff-info.json')
|
|
|
|
|
|
class OutputPaths:
|
|
|
|
def __init__(self, path: str):
|
|
if os.path.exists(path) and not os.path.isdir(path):
|
|
raise AttributeError(f'Path exists but is not a directory: {path}')
|
|
os.makedirs(path, exist_ok=True)
|
|
self.path = path
|
|
self.svg_path = os.path.join(path, 'map.svg')
|
|
self.png_path = os.path.join(path, 'map.png')
|
|
self.html_path = os.path.join(path, 'erfamap.html')
|
|
self.imagemap_path = os.path.join(path, 'imagemap.html')
|
|
|
|
def rel(self, path: str):
|
|
return os.path.relpath(path, start=self.path)
|
|
|
|
|
|
class CoordinateTransform:
|
|
|
|
def __init__(self, projection: str):
|
|
self._proj = pyproj.Transformer.from_crs('epsg:4326', projection)
|
|
self.setup()
|
|
|
|
def setup(self, scalex=1.0, scaley=1.0, bbox=[(-180, -90), (180, 90)]):
|
|
self._scalex = scalex
|
|
self._scaley = scaley
|
|
west = min(bbox[0][0], bbox[1][0])
|
|
north = max(bbox[0][1], bbox[1][1])
|
|
east = max(bbox[0][0], bbox[1][0])
|
|
south = min(bbox[0][1], bbox[1][1])
|
|
self._ox, self._oy = self._proj.transform(west, north)
|
|
return self(east, south)
|
|
|
|
def __call__(self, lon, lat):
|
|
xt, yt = self._proj.transform(lon, lat)
|
|
return (
|
|
(xt - self._ox) * self._scalex,
|
|
(self._oy - yt) * self._scaley
|
|
)
|
|
|
|
|
|
class Drawable(abc.ABC):
|
|
|
|
def render(self, svg, texts):
|
|
pass
|
|
|
|
|
|
class Country(Drawable):
|
|
|
|
SVG_CLASS = 'country'
|
|
SPARQL_QUERY = '''
|
|
|
|
PREFIX wd: <http://www.wikidata.org/entity/>
|
|
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
|
|
PREFIX p: <http://www.wikidata.org/prop/>
|
|
PREFIX ps: <http://www.wikidata.org/prop/statement/>
|
|
PREFIX wikibase: <http://wikiba.se/ontology#>
|
|
|
|
SELECT DISTINCT ?item ?map WHERE {
|
|
# ?item is instance of sovereign state, transitively part of europe and has geoshape ?map
|
|
# ?item is instance of country or sovereign state
|
|
?item wdt:P31 ?stateclass.
|
|
# ?item is transitively part of Europe (Contintent) or EEA
|
|
?item wdt:P361+ ?euroclass.
|
|
# ?item has geoshape ?map (including all non-deprecated results)
|
|
?item p:P3896 ?st.
|
|
?st ps:P3896 ?map;
|
|
MINUS { ?st wikibase:rank wikibase:DeprecatedRank }
|
|
FILTER (?stateclass = wd:Q6256 || ?stateclass = wd:Q3624078).
|
|
FILTER (?euroclass = wd:Q46 || ?euroclass = wd:Q8932).
|
|
}
|
|
'''
|
|
|
|
def __init__(self, ns, name, polygons):
|
|
self.name = name
|
|
self.polygons = [[ns.projection(lon, lat) for lon, lat in p] for p in polygons]
|
|
|
|
def __len__(self):
|
|
return sum([len(p) for p in self.polygons])
|
|
|
|
def render(self, svg, texts):
|
|
for polygon in self.polygons:
|
|
points = ' '.join([f'{x},{y}' for x, y in polygon])
|
|
poly = etree.Element('polygon', points=points)
|
|
poly.set('class', self.__class__.SVG_CLASS)
|
|
poly.set('data-country', self.name)
|
|
svg.append(poly)
|
|
|
|
@classmethod
|
|
def sparql_query(cls):
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'User-Agent': USER_AGENT,
|
|
'Accept': 'application/sparql-results+json',
|
|
}
|
|
body = urllib.parse.urlencode({'query': cls.SPARQL_QUERY}).encode()
|
|
req = urllib.request.Request('https://query.wikidata.org/sparql', headers=headers, data=body)
|
|
with urllib.request.urlopen(req) as resp:
|
|
resultset = json.load(resp)
|
|
results = {}
|
|
for r in resultset.get('results', {}).get('bindings'):
|
|
basename = None
|
|
url = None
|
|
for k, v in r.items():
|
|
if k == 'item':
|
|
basename = os.path.basename(v['value'])
|
|
elif k == 'map':
|
|
url = v['value']
|
|
results[basename] = url
|
|
return results
|
|
|
|
@classmethod
|
|
def fetch(cls, target):
|
|
os.makedirs(target, exist_ok=True)
|
|
shape_urls = cls.sparql_query()
|
|
candidates = {}
|
|
keep = {}
|
|
for item, url in tqdm.tqdm(shape_urls.items()):
|
|
try:
|
|
with urllib.request.urlopen(url) as resp:
|
|
shape = json.load(resp)
|
|
if not shape.get('license', 'proprietary').startswith('CC0-'):
|
|
# Only include public domain data
|
|
continue
|
|
candidates.setdefault(item, []).append(shape)
|
|
except urllib.error.HTTPError as e:
|
|
print(e)
|
|
for item, ican in candidates.items():
|
|
# Prefer zoom level 4
|
|
keep[item] = min(ican, key=lambda x: abs(4-x.get('zoom', 1000)))
|
|
for item, shape in keep.items():
|
|
with open(os.path.join(target, item + '.json'), 'w') as f:
|
|
json.dump(shape, f)
|
|
|
|
@classmethod
|
|
def from_cache(cls, ns, source):
|
|
countries = []
|
|
files = os.listdir(source)
|
|
for f in files:
|
|
if not f.endswith('.json'):
|
|
continue
|
|
path = os.path.join(source, f)
|
|
with open(path, 'r') as sf:
|
|
shapedata = sf.read()
|
|
shape = json.loads(shapedata)
|
|
name = shape['description']['en']
|
|
geo = shape['data']['features'][0]['geometry']
|
|
if geo['type'] == 'Polygon':
|
|
geo['coordinates'] = [geo['coordinates']]
|
|
polygons = []
|
|
for poly in geo['coordinates']:
|
|
polygons.append(poly[0])
|
|
countries.append(cls(ns, name, polygons))
|
|
return countries
|
|
|
|
@classmethod
|
|
def filter_boundingbox(cls, ns, source, target, bbox):
|
|
files = os.listdir(source)
|
|
os.makedirs(target, exist_ok=True)
|
|
for f in tqdm.tqdm(files):
|
|
if not f.endswith('.json') or 'Q183.json' in f:
|
|
continue
|
|
path = os.path.join(source, f)
|
|
with open(path, 'r') as sf:
|
|
shapedata = sf.read()
|
|
shape = json.loads(shapedata)
|
|
keep = False
|
|
geo = shape['data']['features'][0]['geometry']
|
|
if geo['type'] == 'Polygon':
|
|
geo['coordinates'] = [geo['coordinates']]
|
|
for poly in geo['coordinates']:
|
|
for point in poly[0]:
|
|
if point[0] >= bbox[0][0] and point[1] >= bbox[0][1] \
|
|
and point[0] <= bbox[1][0] and point[1] <= bbox[1][1]:
|
|
keep = True
|
|
break
|
|
if keep:
|
|
break
|
|
if keep:
|
|
with open(os.path.join(target, f), 'w') as sf:
|
|
sf.write(shapedata)
|
|
|
|
|
|
|
|
class FederalState(Country):
|
|
|
|
SVG_CLASS = 'state'
|
|
SPARQL_QUERY = '''
|
|
|
|
PREFIX wd: <http://www.wikidata.org/entity/>
|
|
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
|
|
|
|
SELECT DISTINCT ?item ?map WHERE {
|
|
# ?item is instance of federal state of germany and has geoshape ?map
|
|
?item wdt:P31 wd:Q1221156;
|
|
wdt:P3896 ?map
|
|
}
|
|
'''
|
|
|
|
|
|
class Erfa(Drawable):
|
|
|
|
SMW_URL = 'https://doku.ccc.de/index.php?title=Spezial:Semantische_Suche&x=-5B-5BKategorie%3A{category}-5D-5D-20-5B-5BChaostreff-2DActive%3A%3Awahr-5D-5D%2F-3FChaostreff-2DCity%2F-3FChaostreff-2DPhysical-2DAddress%2F-3FChaostreff-2DPhysical-2DHousenumber%2F-3FChaostreff-2DPhysical-2DPostcode%2F-3FChaostreff-2DPhysical-2DCity%2F-3FChaostreff-2DCountry%2F-3FPublic-2DWeb%2F-3FChaostreff-2DLongname%2F-3FChaostreff-2DNickname%2F-3FChaostreff-2DRealname&format=json&limit={limit}&link=all&headers=show&searchlabel=JSON&class=sortable+wikitable+smwtable&sort=&order=asc&offset={offset}&mainlabel=&prettyprint=true&unescape=true'
|
|
|
|
SMW_REQUEST_LIMIT = 50
|
|
|
|
SMW_CATEGORY = 'Erfa-2DKreise'
|
|
|
|
SVG_CLASS = 'erfa'
|
|
SVG_DATA = 'data-erfa'
|
|
SVG_LABEL = True
|
|
SVG_LABELCLASS = 'erfalabel'
|
|
|
|
def __init__(self, ns, name, city, lon, lat, display_name=None, web=None, radius=15):
|
|
self.name = name
|
|
self.city = city
|
|
self.lon = lon
|
|
self.lat = lat
|
|
self.x, self.y = ns.projection(lon, lat)
|
|
self.display_name = display_name if display_name is not None else name
|
|
self.web = web
|
|
self.radius = radius
|
|
|
|
def __eq__(self, o):
|
|
if not isinstance(o, Erfa):
|
|
return False
|
|
return self.name == o.name
|
|
|
|
def render(self, svg, texts):
|
|
cls = self.__class__
|
|
if self.web is not None:
|
|
group = etree.Element('a', href=self.web, target='_blank')
|
|
else:
|
|
group = etree.Element('g')
|
|
group.set(cls.SVG_DATA, self.city)
|
|
circle = etree.Element('circle', cx=str(self.x), cy=str(self.y), r=str(self.radius))
|
|
circle.set('class', cls.SVG_CLASS)
|
|
circle.set(cls.SVG_DATA, self.city)
|
|
title = etree.Element('title')
|
|
title.text = self.display_name
|
|
circle.append(title)
|
|
group.append(circle)
|
|
if self.city in texts:
|
|
box = texts[self.city]
|
|
text = etree.Element('text', x=str(box.left), y=str(box.top + box.meta['baseline']))
|
|
text.set('class', cls.SVG_LABELCLASS)
|
|
text.set(cls.SVG_DATA, self.city)
|
|
text.text = box.meta['text']
|
|
group.append(text)
|
|
svg.append(group)
|
|
|
|
@classmethod
|
|
def address_lookup(cls, attr):
|
|
locator = Nominatim(user_agent=USER_AGENT)
|
|
number = attr['Chaostreff-Physical-Housenumber']
|
|
street = attr['Chaostreff-Physical-Address']
|
|
zipcode = attr['Chaostreff-Physical-Postcode']
|
|
acity = attr['Chaostreff-Physical-City']
|
|
city = attr['Chaostreff-City'][0]
|
|
country = attr['Chaostreff-Country'][0]
|
|
|
|
# Try the most accurate address first, try increasingly inaccurate addresses on failure.
|
|
formats = [
|
|
# Muttenz, Schweiz
|
|
f'{city}, {country}'
|
|
]
|
|
if zipcode and acity:
|
|
# 4132 Muttenz, Schweiz
|
|
formats.insert(0, f'{zipcode[0]} {acity[0]}, {country}')
|
|
if zipcode and acity and number and street:
|
|
# Birsfelderstrasse 6, 4132 Muttenz, Schweiz
|
|
formats.insert(0, f'{street[0]} {number[0]}, {zipcode[0]} {acity[0]}, {country}')
|
|
|
|
for fmt in formats:
|
|
response = locator.geocode(fmt)
|
|
if response is not None:
|
|
return response.longitude, response.latitude
|
|
|
|
print(f'No location found for {name}, tried the following address formats:')
|
|
for fmt in formats:
|
|
print(f' {fmt}')
|
|
return None
|
|
|
|
@classmethod
|
|
def from_api(cls, name, attr, radius, ns):
|
|
city = attr['Chaostreff-City'][0]
|
|
location = cls.address_lookup(attr)
|
|
if location is None:
|
|
raise ValueError(f'No location for {name}')
|
|
lon, lat = location
|
|
# There are up to 4 different names: 3 SMW attrs and the page name
|
|
if len(attr['Chaostreff-Longname']) > 0:
|
|
display_name = attr['Chaostreff-Longname'][0]
|
|
elif len(attr['Chaostreff-Nickname']) > 0:
|
|
display_name = attr['Chaostreff-Nickname'][0]
|
|
elif len(attr['Chaostreff-Realname']) > 0:
|
|
display_name = attr['Chaostreff-Realname'][0]
|
|
else:
|
|
display_name = name
|
|
if len(attr['Public-Web']) > 0:
|
|
web = attr['Public-Web'][0]
|
|
else:
|
|
web = None
|
|
return cls(ns, name, city, lon, lat, display_name, web, radius)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'name': self.name,
|
|
'city': self.city,
|
|
'web': self.web,
|
|
'display_name': self.display_name,
|
|
'location': [self.lon, self.lat]
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, ns, dct, radius):
|
|
name = dct['name']
|
|
city = dct['city']
|
|
lon, lat = dct['location']
|
|
display_name = dct.get('display_name', name)
|
|
web = dct.get('web')
|
|
radius = radius
|
|
return Erfa(ns, name, city, lon, lat, display_name, web, radius)
|
|
|
|
@classmethod
|
|
def fetch(cls, ns, target, radius):
|
|
userpw = os.getenv('DOKU_CCC_DE_BASICAUTH')
|
|
if userpw is None:
|
|
print('Please set environment variable DOKU_CCC_DE_BASICAUTH=username:password')
|
|
exit(1)
|
|
auth = base64.b64encode(userpw.encode()).decode()
|
|
erfas = []
|
|
|
|
offset = 0
|
|
while True:
|
|
url = cls.SMW_URL.format(
|
|
category=cls.SMW_CATEGORY,
|
|
limit=cls.SMW_REQUEST_LIMIT,
|
|
offset=offset
|
|
)
|
|
req = urllib.request.Request(url, headers={'Authorization': f'Basic {auth}'})
|
|
with urllib.request.urlopen(req) as resp:
|
|
body = resp.read().decode()
|
|
if len(body) == 0:
|
|
# All pages read, new response is empty
|
|
break
|
|
erfadata = json.loads(body)
|
|
if erfadata['rows'] == 0:
|
|
break
|
|
offset += erfadata['rows']
|
|
for name, attr in erfadata['results'].items():
|
|
try:
|
|
erfa = cls.from_api(ns, name, attr['printouts'], radius)
|
|
erfas.append(erfa)
|
|
except BaseException as e:
|
|
print(e)
|
|
continue
|
|
|
|
# Save to cache
|
|
with open(target, 'w') as f:
|
|
json.dump([e.to_dict() for e in erfas], f)
|
|
return erfas
|
|
|
|
@classmethod
|
|
def from_cache(cls, ns, source, radius):
|
|
with open(source, 'r') as f:
|
|
data = json.load(f)
|
|
return [Erfa.from_dict(ns, d, radius) for d in data]
|
|
|
|
|
|
class Chaostreff(Erfa):
|
|
|
|
SMW_CATEGORY = 'Chaostreffs'
|
|
|
|
SVG_CLASS = 'chaostreff'
|
|
SVG_DATA = 'data-chaostreff'
|
|
SVG_DOTSIZE_ATTR = 'dotsize_treff'
|
|
SVG_LABEL = False
|
|
|
|
|
|
class BoundingBox:
|
|
|
|
def __init__(self, left, top, right=None, bottom=None, *, width=None, height=None, meta=None, base_weight=0):
|
|
self.left = left
|
|
self.top = top
|
|
self.right = right if width is None else left + width
|
|
self.bottom = bottom if height is None else top + height
|
|
self.meta = meta
|
|
self.base_weight = base_weight
|
|
self.finished = False
|
|
self._weight = 0
|
|
self._optimal = True
|
|
|
|
def __contains__(self, other):
|
|
if isinstance(other, BoundingBox):
|
|
if other.right < self.left or other.left > self.right:
|
|
return False
|
|
if other.bottom < self.top or other.top > self.bottom:
|
|
return False
|
|
return True
|
|
elif isinstance(other, tuple) or isinstance(other, list):
|
|
if len(other) != 2:
|
|
raise TypeError()
|
|
if self.left > other[0] or self.right < other[0]:
|
|
return False
|
|
if self.top > other[1] or self.bottom < other[1]:
|
|
return False
|
|
return True
|
|
raise TypeError()
|
|
|
|
def __add__(self, other):
|
|
if not isinstance(other, tuple) and not isinstance(other, list):
|
|
raise TypeError()
|
|
return BoundingBox(self.left + other[0], self.top + other[1],
|
|
self.right + other[0], self.bottom + other[1],
|
|
width=None, height=None, meta=self.meta, base_weight=self.base_weight)
|
|
|
|
def __iadd__(self, other):
|
|
if not isinstance(other, tuple) and not isinstance(other, list):
|
|
raise TypeError()
|
|
self.left += other[0]
|
|
self.top += other[1]
|
|
self.right += other[0]
|
|
self.bottom += other[1]
|
|
return self
|
|
|
|
def __sub__(self, other):
|
|
if not isinstance(other, tuple) and not isinstance(other, list):
|
|
raise TypeError()
|
|
return BoundingBox(self.left - other[0], self.top - other[1],
|
|
self.right - other[0], self.bottom - other[1],
|
|
width=None, height=None, meta=self.meta, base_weight=self.base_weight)
|
|
|
|
def __isub__(self, other):
|
|
if not isinstance(other, tuple) and not isinstance(other, list):
|
|
raise TypeError()
|
|
self.left -= other[0]
|
|
self.top -= other[1]
|
|
self.right -= other[0]
|
|
self.bottom -= other[1]
|
|
self._weight = None
|
|
return self
|
|
|
|
@property
|
|
def width(self):
|
|
return self.right - self.left
|
|
|
|
@property
|
|
def height(self):
|
|
return self.bottom - self.top
|
|
|
|
@property
|
|
def center(self):
|
|
return (self.left + self.width/2, self.top + self.height/2)
|
|
|
|
def distance2(self, other):
|
|
c1 = self.center
|
|
c2 = other.center
|
|
return math.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2)
|
|
|
|
def with_margin(self, margin):
|
|
return BoundingBox(self.left - margin, self.top - margin,
|
|
self.right + margin, self.bottom + margin,
|
|
width=None, height=None, meta=self.meta, base_weight=self.base_weight)
|
|
|
|
def chebyshev_distance(self, other):
|
|
# https://en.wikipedia.org/wiki/Chebyshev_distance
|
|
if other in self:
|
|
return 0
|
|
if isinstance(other, BoundingBox):
|
|
if other.left < self.left < other.right or other.left < self.right < other.right:
|
|
dh = 0
|
|
else:
|
|
dh = min(abs(self.left - other.right), abs(self.right - other.left))
|
|
if other.top < self.top < other.bottom or other.top < self.bottom < other.bottom:
|
|
dv = 0
|
|
else:
|
|
dv = min(abs(self.top - other.bottom), abs(self.bottom - other.top))
|
|
return max(dh, dv)
|
|
elif isinstance(other, tuple) or isinstance(other, list):
|
|
if self.left < other[0] < self.right:
|
|
dh = 0
|
|
else:
|
|
dh = min(abs(self.left - other[0]), abs(self.right - other[0]))
|
|
if self.top < other[1] < self.bottom:
|
|
dv = 0
|
|
else:
|
|
dv = min(abs(self.top - other[1]), abs(self.bottom - other[1]))
|
|
return max(dh, dv)
|
|
else:
|
|
raise TypeError()
|
|
|
|
def compute_weight(self, other, erfas, chaostreffs, pdist, ns):
|
|
w = 0
|
|
self._optimal = True
|
|
swm = self.with_margin(pdist)
|
|
swe = self.with_margin(ns.dotsize_erfa)
|
|
swc = self.with_margin(ns.dotsize_treff)
|
|
maxs = []
|
|
# I hope these weights are somewhat reasonably balanced...
|
|
# Basically the weights correspond to geometrical distances,
|
|
# except for an actual collision, which gets a huge extra weight.
|
|
for o in other:
|
|
if o.meta['erfa'] == self.meta['erfa']:
|
|
continue
|
|
if o in self:
|
|
if o.finished:
|
|
w += 1000
|
|
self._optimal = False
|
|
else:
|
|
w += 50
|
|
self._optimal = False
|
|
else:
|
|
maxs.append(max(pdist*2 - swm.chebyshev_distance(o), 0))
|
|
for erfa in erfas:
|
|
if erfa == self.meta['erfa']:
|
|
continue
|
|
location = (erfa.x, erfa.y)
|
|
if location in swe:
|
|
w += 1000
|
|
self._optimal = False
|
|
else:
|
|
maxs.append(max(pdist*2 - swe.chebyshev_distance(location), 0))
|
|
for treff in chaostreffs:
|
|
if treff == self.meta['erfa']:
|
|
continue
|
|
location = (treff.x, treff.y)
|
|
if location in swc:
|
|
w += 1000
|
|
self._optimal = False
|
|
else:
|
|
maxs.append(max(pdist*2 - swc.chebyshev_distance(location), 0))
|
|
if len(maxs) > 0:
|
|
w += max(maxs)
|
|
self._weight = w
|
|
|
|
@property
|
|
def weight(self):
|
|
return self._weight + self.base_weight
|
|
|
|
@property
|
|
def is_optimal(self):
|
|
# Candidate is considered optimal if it doesn't collide with anything else
|
|
return self._optimal
|
|
|
|
def __str__(self):
|
|
return f'(({int(self.left)}, {int(self.top)}, {int(self.right)}, {int(self.bottom)}), weight={self.weight})'
|
|
|
|
|
|
def compute_bbox(ns):
|
|
if ns.bbox is not None:
|
|
return [
|
|
(min(ns.bbox[0], ns.bbox[2]), min(ns.bbox[1], ns.bbox[3])),
|
|
(max(ns.bbox[0], ns.bbox[2]), max(ns.bbox[1], ns.bbox[3]))
|
|
]
|
|
|
|
print('Computing map bounding box')
|
|
bounds = []
|
|
for path in tqdm.tqdm([ns.cache_directory.erfa_info, ns.cache_directory.chaostreff_info]):
|
|
erfas = Erfa.from_cache(ns, path, radius=0)
|
|
for e in erfas:
|
|
if len(bounds) == 0:
|
|
bounds.append(e.lon)
|
|
bounds.append(e.lat)
|
|
bounds.append(e.lon)
|
|
bounds.append(e.lat)
|
|
else:
|
|
bounds[0] = min(bounds[0], e.lon)
|
|
bounds[1] = min(bounds[1], e.lat)
|
|
bounds[2] = max(bounds[2], e.lon)
|
|
bounds[3] = max(bounds[3], e.lat)
|
|
return [
|
|
(bounds[0] - ns.bbox_margin, bounds[1] - ns.bbox_margin),
|
|
(bounds[2] + ns.bbox_margin, bounds[3] + ns.bbox_margin)
|
|
]
|
|
|
|
|
|
def optimize_text_layout(ns, erfas, chaostreffs, width, svg):
|
|
|
|
# Load the font and measure its various different heights
|
|
font = ImageFont.truetype(ns.font, ns.font_size)
|
|
pil = ImageDraw(Image.new('P', (1, 1)))
|
|
xheight = -pil.textbbox((0,0), 'x', font=font, anchor='ls')[1]
|
|
capheight = -pil.textbbox((0,0), 'A', font=font, anchor='ls')[1]
|
|
voffset = -capheight / 2
|
|
|
|
# Generate a discrete set of text placement candidates around each erfa dot
|
|
candidates = {}
|
|
for erfa in erfas:
|
|
city = erfa.city
|
|
text = erfa.city
|
|
for rfrom, to in ns.rename:
|
|
if rfrom == city:
|
|
text = to
|
|
break
|
|
|
|
meta = {'erfa': erfa, 'text': text, 'baseline': capheight}
|
|
textbox = pil.textbbox((0, 0), text, font=font, anchor='ls') # left, baseline at 0,0
|
|
mw, mh = textbox[2] - textbox[0], textbox[3] - textbox[1]
|
|
candidates[city] = []
|
|
|
|
# Iterate over the dot-to-text distance range in discrete steps
|
|
it = max(0, ns.font_step_distance)
|
|
for j in range(it+1):
|
|
dist = ns.font_min_distance + (ns.font_max_distance - ns.font_min_distance) * j / it
|
|
bw = dist
|
|
# Generate 5 candidates each left and right of the dot, with varying vertical offset
|
|
for i in range(-1, 2):
|
|
if i == 0:
|
|
bw -= 0.003
|
|
bwl, bwr = bw, bw
|
|
if erfa.x > 0.8 * width:
|
|
bwr = bw + 0.001
|
|
else:
|
|
bwl = bw + 0.001
|
|
candidates[city].append(BoundingBox(erfa.x - dist - mw, erfa.y + voffset + ns.dotsize_erfa*i*2, width=mw, height=mh, meta=meta, base_weight=bwl))
|
|
candidates[city].append(BoundingBox(erfa.x + dist, erfa.y + voffset + ns.dotsize_erfa*i*2, width=mw, height=mh, meta=meta, base_weight=bwr))
|
|
# Generate 3 candidates each above and beneath the dot, aligned left, centered and right
|
|
candidates[city].extend([
|
|
BoundingBox(erfa.x - mw/2, erfa.y - dist - mh, width=mw, height=mh, meta=meta, base_weight=bw + 0.001),
|
|
BoundingBox(erfa.x - mw/2, erfa.y + dist, width=mw, height=mh, meta=meta, base_weight=bw + 0.001),
|
|
BoundingBox(erfa.x - ns.dotsize_erfa, erfa.y - dist - mh, width=mw, height=mh, meta=meta, base_weight=bw + 0.002),
|
|
BoundingBox(erfa.x - ns.dotsize_erfa, erfa.y + dist, width=mw, height=mh, meta=meta, base_weight=bw + 0.003),
|
|
BoundingBox(erfa.x + ns.dotsize_erfa - mw, erfa.y - dist - mh, width=mw, height=mh, meta=meta, base_weight=bw + 0.002),
|
|
BoundingBox(erfa.x + ns.dotsize_erfa - mw, erfa.y + dist, width=mw, height=mh, meta=meta, base_weight=bw + 0.003),
|
|
])
|
|
|
|
# If debugging is enabled, render one rectangle around each label's bounding box, and one rectangle around each label's median box
|
|
if ns.debug:
|
|
for c in candidates[city]:
|
|
di = etree.Element('rect', x=str(c.left), y=str(c.top + capheight - xheight), width=str(c.width), height=str(xheight))
|
|
di.set('class', 'debugright')
|
|
dr = etree.Element('rect', x=str(c.left), y=str(c.top), width=str(c.width), height=str(c.height))
|
|
dr.set('class', 'debugleft')
|
|
svg.append(di)
|
|
svg.append(dr)
|
|
|
|
|
|
unfinished = {e.city for e in erfas}
|
|
finished = {}
|
|
|
|
# Greedily choose a candidate for each label
|
|
with tqdm.tqdm(total=len(erfas)) as progress:
|
|
while len(unfinished) > 0:
|
|
progress.update(len(erfas) - len(unfinished))
|
|
|
|
all_boxes = set(finished.values())
|
|
unfinished_boxes = set()
|
|
for city in unfinished:
|
|
all_boxes.update(candidates[city])
|
|
unfinished_boxes.update(candidates[city])
|
|
for box in all_boxes:
|
|
box.compute_weight(all_boxes, erfas, chaostreffs, pdist=max(ns.font_min_distance, xheight), ns=ns)
|
|
|
|
# Get candidate with the least number of optimal solutions > 0
|
|
optcity = min(unfinished, key=lambda city: len([1 for box in candidates[city] if box.is_optimal]) or float('inf'))
|
|
optboxes = [box for box in candidates[optcity] if box.is_optimal]
|
|
if len(optboxes) > 0:
|
|
finished[optcity] = min(optboxes, key=lambda box: box.weight)
|
|
finished[optcity].finished = True
|
|
unfinished.discard(optcity)
|
|
continue
|
|
|
|
# If no candidate with at least one optimal solution is left, go by global minimum
|
|
minbox = min(unfinished_boxes, key=lambda box: box.weight)
|
|
mincity = minbox.meta['erfa'].city
|
|
finished[mincity] = minbox
|
|
minbox.finished = True
|
|
unfinished.discard(mincity)
|
|
|
|
# Iteratively improve the layout
|
|
for i in tqdm.tqdm(range(len(finished))):
|
|
changed = False
|
|
order = list(finished.keys())
|
|
random.shuffle(order)
|
|
for city in order:
|
|
all_boxes = set(finished.values())
|
|
for box in candidates[city]:
|
|
box.compute_weight(all_boxes, erfas, chaostreffs, pdist=max(ns.font_min_distance, xheight), ns=ns)
|
|
minbox = min(candidates[city], key=lambda box: box.weight)
|
|
if minbox is not finished[city]:
|
|
changed = True
|
|
finished[city] = minbox
|
|
minbox.finished = True
|
|
if not changed:
|
|
# Nothing changed in this iteration - no need to retry
|
|
break
|
|
|
|
return finished
|
|
|
|
|
|
def create_imagemap(ns, size, parent, erfas, chaostreffs, texts):
|
|
s = ns.png_scale
|
|
img = etree.Element('img',
|
|
src=ns.output_directory.rel(ns.output_directory.png_path),
|
|
usemap='#erfamap',
|
|
width=str(size[0]*s), height=str(size[1]*s))
|
|
imgmap = etree.Element('map', name='erfamap')
|
|
|
|
for erfa in erfas:
|
|
if erfa.web is None:
|
|
continue
|
|
area = etree.Element('area',
|
|
shape='circle',
|
|
coords=f'{erfa.x*s},{erfa.y*s},{erfa.radius*s}',
|
|
href=erfa.web,
|
|
title=erfa.display_name)
|
|
imgmap.append(area)
|
|
|
|
if erfa.city in texts:
|
|
box = texts[erfa.city]
|
|
area2 = etree.Element('area',
|
|
shape='rect',
|
|
coords=f'{box.left*s},{box.top*s},{box.right*s},{box.bottom*s}',
|
|
href=erfa.web,
|
|
title=erfa.display_name)
|
|
imgmap.append(area2)
|
|
|
|
parent.append(img)
|
|
parent.append(imgmap)
|
|
|
|
|
|
def create_svg(ns, bbox):
|
|
print('Creating SVG image')
|
|
|
|
# Convert from WGS84 lon, lat to chosen projection
|
|
svg_box = ns.projection.setup(ns.scale_x, ns.scale_y, bbox=bbox)
|
|
rectbox = [0, 0, svg_box[0], svg_box[1]]
|
|
|
|
# Load everything from cached JSON files
|
|
countries = Country.from_cache(ns, ns.cache_directory.shapes_filtered)
|
|
states = FederalState.from_cache(ns, ns.cache_directory.shapes_states)
|
|
erfas = Erfa.from_cache(ns, ns.cache_directory.erfa_info, ns.dotsize_erfa)
|
|
chaostreffs = Chaostreff.from_cache(ns, ns.cache_directory.chaostreff_info, ns.dotsize_treff)
|
|
# There is an edge case where when a space changed states between Erfa and Chaostreff, the
|
|
# Semantic MediaWiki engine returns this space as both an Erfa and a Chaostreff, resulting
|
|
# in glitches in the rendering. As a workaround, here we simply assume that it's an Erfa.
|
|
chaostreffs = [c for c in chaostreffs if c not in erfas]
|
|
|
|
for c in states + countries:
|
|
for poly in c.polygons:
|
|
for x, y in poly:
|
|
rectbox[0] = min(x, rectbox[0])
|
|
rectbox[1] = min(y, rectbox[1])
|
|
rectbox[2] = max(x, rectbox[2])
|
|
rectbox[3] = max(y, rectbox[3])
|
|
|
|
print('Copying stylesheet and font')
|
|
dst = os.path.join(ns.output_directory.path, ns.stylesheet)
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
shutil.copyfile(ns.stylesheet, dst)
|
|
dst = os.path.join(ns.output_directory.path, ns.font)
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
shutil.copyfile(ns.font, dst)
|
|
|
|
svg = etree.Element('svg',
|
|
xmlns='http://www.w3.org/2000/svg',
|
|
viewBox=f'0 0 {svg_box[0]} {svg_box[1]}',
|
|
width=str(svg_box[0]), height=str(svg_box[1]))
|
|
|
|
style = etree.Element('style', type='text/css')
|
|
style.text = f'@import url({ns.stylesheet})'
|
|
svg.append(style)
|
|
|
|
bg = etree.Element('rect',
|
|
id='background',
|
|
x=str(rectbox[0]),
|
|
y=str(rectbox[1]),
|
|
width=str(rectbox[3]-rectbox[1]),
|
|
height=str(rectbox[2]-rectbox[0]))
|
|
bg.set('class', 'background')
|
|
svg.append(bg)
|
|
|
|
# This can take some time, especially if lots of candidates are generated
|
|
print('Layouting labels')
|
|
texts = optimize_text_layout(ns, erfas, chaostreffs, width=svg_box[0], svg=svg)
|
|
|
|
# Render shortest shapes last s.t. Berlin, Hamburg and Bremen are rendered on top of their surrounding states
|
|
states = sorted(states, key=lambda x: -len(x))
|
|
# Render country and state borders
|
|
for c in countries + states:
|
|
c.render(svg, texts)
|
|
|
|
# Render Erfa dots and their labels
|
|
for erfa in erfas + chaostreffs:
|
|
erfa.render(svg, texts)
|
|
|
|
# Generate SVG, PNG and HTML output files
|
|
|
|
print('Writing SVG')
|
|
with open(ns.output_directory.svg_path, 'wb') as mapfile:
|
|
root = etree.ElementTree(svg)
|
|
root.write(mapfile)
|
|
|
|
print('Writing PNG')
|
|
cairosvg.svg2png(url=ns.output_directory.svg_path, write_to=ns.output_directory.png_path,
|
|
scale=ns.png_scale)
|
|
|
|
print('Writing HTML SVG page')
|
|
html = etree.Element('html')
|
|
head = etree.Element('head')
|
|
link = etree.Element('link', rel='stylesheet', href=ns.stylesheet)
|
|
head.append(link)
|
|
html.append(head)
|
|
body = etree.Element('body')
|
|
obj = etree.Element('object',
|
|
data=ns.output_directory.rel(ns.output_directory.svg_path),
|
|
width=str(svg_box[0]), height=str(svg_box[1]))
|
|
create_imagemap(ns, svg_box, obj, erfas, chaostreffs, texts)
|
|
body.append(obj)
|
|
html.append(body)
|
|
with open(ns.output_directory.html_path, 'wb') as f:
|
|
f.write(b'<!DOCTYLE html>\n')
|
|
etree.ElementTree(html).write(f)
|
|
|
|
print('Writing HTML Image Map')
|
|
html = etree.Element('html')
|
|
head = etree.Element('head')
|
|
link = etree.Element('link', rel='stylesheet', href=ns.stylesheet)
|
|
head.append(link)
|
|
html.append(head)
|
|
body = etree.Element('body')
|
|
html.append(body)
|
|
|
|
create_imagemap(ns, svg_box, body, erfas, chaostreffs, texts)
|
|
|
|
with open(ns.output_directory.imagemap_path, 'wb') as f:
|
|
f.write(b'<!DOCTYLE html>\n')
|
|
etree.ElementTree(html).write(f)
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(sys.argv[0])
|
|
ap.add_argument('--cache-directory', type=CachePaths, default='cache', help='Path to the cache directory')
|
|
ap.add_argument('--output-directory', type=OutputPaths, default='out', help='Path to the output directory')
|
|
ap.add_argument('--update-borders', action='store_true', default=False, help='Update country borders from Wikidata')
|
|
ap.add_argument('--update-erfalist', action='store_true', default=False, help='Update Erfa/Chaostreff list from doku.ccc.de')
|
|
ap.add_argument('--bbox', type=float, nargs=4, metavar=('LON', 'LAT', 'LON', 'LAT') ,default=None, help='Override map bounding box')
|
|
ap.add_argument('--bbox-margin', type=float, default=0.3, help='Margin around the inferred bounding box. Ignored with --bbox')
|
|
ap.add_argument('--stylesheet', type=str, default='style/erfamap.css', help='Stylesheet for the generated SVG')
|
|
ap.add_argument('--font', type=str, default='style/concertone-regular.ttf', help='Name of the font used in the stylesheet.')
|
|
ap.add_argument('--font-size', type=int, default=45, help='Size of the font used in the stylesheet.')
|
|
ap.add_argument('--font-min-distance', type=int, default=18, help='Minimal distance of labels from their dots center')
|
|
ap.add_argument('--font-max-distance', type=int, default=40, help='Maximal distance of labels from their dots center')
|
|
ap.add_argument('--font-step-distance', type=int, default=3, help='Distance steps of labels from their dots center')
|
|
ap.add_argument('--dotsize-erfa', type=float, default=13, help='Radius of Erfa dots')
|
|
ap.add_argument('--dotsize-treff', type=float, default=8, help='Radius of Chaostreff dots')
|
|
ap.add_argument('--rename', type=str, action='append', nargs=2, metavar=('FROM', 'TO'), default=[], help='Rename a city with an overly long name (e.g. "Rothenburg ob der Tauber" to "Rothenburg")')
|
|
ap.add_argument('--projection', type=CoordinateTransform, default='epsg:4258', help='Map projection to convert the WGS84 coordinates to')
|
|
ap.add_argument('--scale-x', type=float, default=130, help='X axis scale to apply after projecting')
|
|
ap.add_argument('--scale-y', type=float, default=200, help='Y axis scale to apply after projecting')
|
|
ap.add_argument('--png-scale', type=float, default=1.0, help='Scale of the PNG image')
|
|
ap.add_argument('--debug', action='store_true', default=False, help='Add debug information to the produced SVG')
|
|
|
|
ns = ap.parse_args(sys.argv[1:])
|
|
|
|
if ns.update_borders or not os.path.isdir(ns.cache_directory.shapes_countries):
|
|
if os.path.isdir(ns.cache_directory.shapes_countries):
|
|
shutil.rmtree(ns.cache_directory.shapes_countries)
|
|
print('Retrieving country border shapes')
|
|
Country.fetch(target=ns.cache_directory.shapes_countries)
|
|
|
|
if ns.update_borders or not os.path.isdir(ns.cache_directory.shapes_states):
|
|
if os.path.isdir(ns.cache_directory.shapes_states):
|
|
shutil.rmtree(ns.cache_directory.shapes_states)
|
|
print('Retrieving state border shapes')
|
|
FederalState.fetch(target=ns.cache_directory.shapes_states)
|
|
|
|
if ns.update_erfalist or not os.path.isfile(ns.cache_directory.erfa_info):
|
|
if os.path.exists(ns.cache_directory.erfa_info):
|
|
os.unlink(ns.cache_directory.erfa_info)
|
|
print('Retrieving Erfas information')
|
|
Erfa.fetch(ns, target=ns.cache_directory.erfa_info, radius=ns.dotsize_erfa)
|
|
|
|
if ns.update_erfalist or not os.path.isfile(ns.cache_directory.chaostreff_info):
|
|
if os.path.exists(ns.cache_directory.chaostreff_info):
|
|
os.unlink(ns.cache_directory.chaostreff_info)
|
|
print('Retrieving Chaostreffs information')
|
|
Chaostreff.fetch(target=ns.cache_directory.chaostreff_info, radius=ns.dotsize_treff)
|
|
|
|
bbox = compute_bbox(ns)
|
|
|
|
print('Filtering countries outside the bounding box')
|
|
Country.filter_boundingbox(ns, ns.cache_directory.shapes_countries, ns.cache_directory.shapes_filtered, bbox)
|
|
|
|
create_svg(ns, bbox)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|