#!/usr/bin/python3 import sys import gzip import time import datetime from lxml import etree # Knihovna lxml je Python binding na C knihovnu lxml, takže většina zpracování XMLka # je implementovaná rychle v C. # Lze ji získat pomocí `pip3 install lxml` def die(exit_code, message): print(message, file=sys.stderr) sys.exit(exit_code) def getTag(el, name): """Vrací hodnotu daného tagu nebo None Arguments: el {etree.Element} -- Naparsovaný XML element name {str} -- Název tagu Returns: bool -- If the elements contains the tag. """ for tag in el.findall("tag"): if tag.get('k') == name: return tag.get('v') return None def hasTag(el, name): """Kontroluje jestli má element daný tag (bez kontroly hodnoty). Arguments: el {etree.Element} -- Naparsovaný XML element name {str} -- Název tagu Returns: bool """ return (getTag(el, name) is not None) ################## nodes, ways, relations = 0, 0, 0 streets = {} start = time.time() # Funkce na vypisování statistiky def printStat(readed): megabytes = readed / 1024 / 1024 elapsed = time.time() - start elapsedString = str(datetime.timedelta(seconds=elapsed)) speed = megabytes/elapsed if elapsed > 0 else 0 print( f"Elapsed: {elapsedString:20s} Read: {megabytes:10.2f}MB ({speed:5.2f}MB/s)" + f"\tNodes: {nodes:8d}\tWays: {ways:8d}\tRelations: {relations:8d}\tFound streets: {len(streets):8d}", file=sys.stderr, end="\r", flush=True ) ################## if len(sys.argv) != 2: die(1, f"Usage: {sys.argv[0]} \n") # 1. Otevřeme gzip soubor printStat(0) with gzip.open(sys.argv[1], 'rb') as gzippedFile: # Budeme streamově parsovat pomocí iterparse(). Ten se může "chytat" na více # typů eventů - nám bude stačit defaultní "end", který nastává vždy po načtení # celého tagu (tedy třeba po načtení ). Je potřeba vyjmenovat elementy, # které nás zajímají, abychom nebyli voláni například pro vnořené elementy # a nemuseli to speciálně ošetřovat. for event, elem in etree.iterparse(gzippedFile, tag=('node', 'way', 'relation')): if elem.tag == 'node': nodes += 1 elif elem.tag == 'way': ways += 1 if hasTag(elem, "highway"): name = getTag(elem, "name") if name is not None: if name not in streets: streets[name] = 0 streets[name] += 1 elif elem.tag == 'relation': relations += 1 # Vypíšeme statistiku každých 10000 načtených elementů if (nodes+ways+relations) % 10000 == 0: printStat(gzippedFile.tell()) # Odstraníme zpracovaný element z paměti (musíme udělat explicitně, jinak by tam zůstal). # Musíme smazat i všechny rodiče současného elementu, kteří byli vytvořeni při parsování # (jinak by nám elementy zůstaly a postupně užíraly paměť, souvisí to s interním fungováním # knihovny lxml) elem.clear() while elem.getprevious() is not None: del elem.getparent()[0] del elem printStat(gzippedFile.tell()) elapsedString = str(datetime.timedelta(seconds=time.time() - start)) print(f"\n\nTotal time: {elapsedString}\nTotal uniques streets: {len(streets)}", file=sys.stderr) for name in streets: print(f"{name}\t{streets[name]}")