diff --git a/.github/workflows/pythonapp.yml b/.github/workflows/pythonapp.yml index e6ca21c..6198b66 100644 --- a/.github/workflows/pythonapp.yml +++ b/.github/workflows/pythonapp.yml @@ -24,6 +24,10 @@ jobs: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. flake8 . --count --exit-zero --max-complexity=10 --max-line-length=160 --statistics + - name: Typechecking with mypy + run: | + pip install mypy + mypy . - name: Test with pytest run: | pip install pytest diff --git a/README.md b/README.md index a21fcf3..17190cc 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ -# OsmToRoadGraph v.0.4.2 +# OsmToRoadGraph v.0.4.3 [![Build Status](https://travis-ci.org/AndGem/OsmToRoadGraph.svg?branch=master)](https://travis-ci.org/AndGem/OsmToRoadGraph) [![codecov](https://codecov.io/gh/AndGem/OsmToRoadGraph/branch/master/graph/badge.svg)](https://codecov.io/gh/AndGem/OsmToRoadGraph) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) -- [OsmToRoadGraph v.0.4.2](#osmtoroadgraph-v042) +- [OsmToRoadGraph v.0.4.3](#osmtoroadgraph-v043) - [Introduction](#introduction) - [Motivation](#motivation) - [Description](#description) @@ -73,7 +73,7 @@ python run.py -f data/karlsruhe_small.osm -n p -v ### Output -The output will consist of two plaintext files. One file ending in `.pypgr`, `pybgr`, or `pycgr` depending on the network type selected; the other file will have the same ending with a `_names` as additional suffix. The first file contains the graph structure as well as additional information about the edge (length, max speed according to highway type, if it is a one-way street or not). The file ending with `_names` includes the street names for the edges. +The output will consist of two plaintext files. One file ending in `.pypgr`, `pybgr`, or `pycgr` depending on the network type selected; the other file will have the same ending with a `_names` as additional suffix. The first file contains the graph structure as well as additional information about the edge (length, max speed according to highway type, if it is a one-way street or not). The file ending with `_names` includes the street names for the edges. #### Output Format diff --git a/graph/contract_graph.py b/graph/contract_graph.py index b094783..1763bb8 100644 --- a/graph/contract_graph.py +++ b/graph/contract_graph.py @@ -5,12 +5,12 @@ import utils.timer as timer from graph.graph import Graph -from graph.graph_types import Edge, SimpleEdge, Vertex +from graph.graph_types import VertexType, EdgeType from typing import Dict, List, Optional, Set, Tuple @timer.timer -def contract(graph): +def contract(graph: Graph): all_new_edges = find_new_edges(graph) filtered_edges = remove_duplicates(all_new_edges) node_ids = gather_node_ids(filtered_edges) @@ -19,11 +19,11 @@ def contract(graph): return graphfactory.build_graph_from_vertices_edges(nodes, filtered_edges) -def get_nodes(graph: Graph, node_ids: Set[int]) -> List[Vertex]: +def get_nodes(graph: Graph, node_ids: Set[int]) -> List[VertexType]: return list(map(lambda node_id: graph.get_node(node_id), node_ids)) -def gather_node_ids(edges: List[SimpleEdge]) -> Set[int]: +def gather_node_ids(edges: List[EdgeType]) -> Set[int]: print("\t gathering nodes...") node_ids = set() for e in edges: @@ -32,9 +32,9 @@ def gather_node_ids(edges: List[SimpleEdge]) -> Set[int]: return node_ids -def remove_duplicates(edges: List[SimpleEdge]) -> List[SimpleEdge]: +def remove_duplicates(edges: List[EdgeType]) -> List[EdgeType]: print("\t removing duplicate edges...") - added_edges = set() + added_edges: Set[Tuple[int, int]] = set() filtered_edges = [] for edge in edges: if (edge.s, edge.t) not in added_edges and (edge.t, edge.s) not in added_edges: @@ -44,7 +44,7 @@ def remove_duplicates(edges: List[SimpleEdge]) -> List[SimpleEdge]: return filtered_edges -def find_new_edges(graph: Graph) -> List[SimpleEdge]: +def find_new_edges(graph: Graph) -> List[EdgeType]: edge_by_s_t = edge_mapping(graph) all_new_edges = [] for node_id in range(len(graph.vertices)): @@ -60,7 +60,7 @@ def find_new_edges(graph: Graph) -> List[SimpleEdge]: return all_new_edges -def edge_mapping(graph: Graph) -> Dict[Tuple[int, int], Edge]: +def edge_mapping(graph: Graph) -> Dict[Tuple[int, int], EdgeType]: edge_by_s_t = {} for edge in graph.edges: edge_by_s_t[(edge.s, edge.t)] = edge @@ -77,7 +77,7 @@ def is_important_node(graph: Graph, node_id: int) -> bool: return len(graph.all_neighbors(node_id)) != 2 -def get_edges(nodes: List[int], edges_by_s_t: Dict[Tuple[int, int], Edge]) -> List[Edge]: +def get_edges(nodes: List[int], edges_by_s_t: Dict[Tuple[int, int], EdgeType]) -> List[EdgeType]: if nodes: edges = [] for i in range(len(nodes) - 1): @@ -87,7 +87,7 @@ def get_edges(nodes: List[int], edges_by_s_t: Dict[Tuple[int, int], Edge]) -> Li return [] -def merge_edges(edges: List[Edge]) -> Optional[SimpleEdge]: +def merge_edges(edges: List[EdgeType]) -> Optional[EdgeType]: if edges: s, t = edges[0].s, edges[-1].t if s != t: diff --git a/graph/graph.py b/graph/graph.py index 87e9e25..328a030 100644 --- a/graph/graph.py +++ b/graph/graph.py @@ -1,16 +1,16 @@ -from graph.graph_types import Edge, SimpleEdge, Vertex -from typing import List, Union +from graph.graph_types import EdgeType, VertexType +from typing import List, Set class Graph(object): def __init__(self) -> None: - self.edges = [] - self.vertices = [] - self.outneighbors = [] - self.inneighbors = [] + self.edges: List[EdgeType] = [] + self.vertices: List[VertexType] = [] + self.outneighbors: List[Set[int]] = [] + self.inneighbors: List[Set[int]] = [] - def add_edge(self, edge: Union[SimpleEdge, Edge]) -> None: + def add_edge(self, edge: EdgeType) -> None: self.edges.append(edge) if edge.forward: @@ -21,12 +21,12 @@ def add_edge(self, edge: Union[SimpleEdge, Edge]) -> None: self.outneighbors[edge.t].add(edge.s) self.inneighbors[edge.s].add(edge.t) - def add_node(self, vertex: Vertex) -> None: + def add_node(self, vertex: VertexType) -> None: self.vertices.append(vertex) self.outneighbors.append(set()) self.inneighbors.append(set()) - def get_node(self, node_id: int) -> Vertex: + def get_node(self, node_id: int) -> VertexType: return self.vertices[node_id] def edge_description(self, edge_id): diff --git a/graph/graph_types.py b/graph/graph_types.py index 4ef8937..5801138 100644 --- a/graph/graph_types.py +++ b/graph/graph_types.py @@ -1,3 +1,6 @@ +from typing import Union + + class Vertex(object): __slots__ = ["id", "lat", "lon"] @@ -14,12 +17,14 @@ class Edge(object): __slots__ = ["s", "t", "length", "highway", "max_v", "forward", "backward", "name"] def __init__(self, s: int, t: int, length: float, highway: str, max_v: int, f: bool, b: bool, name: str) -> None: - self.s, self.t = s, t - self.length = length - self.highway = highway - self.max_v = max_v - self.forward, self.backward = f, b - self.name = name + self.s: int = s + self.t: int = t + self.length: float = length + self.highway: str = highway + self.max_v: int = max_v + self.forward: bool = f + self.backward: bool = b + self.name: str = name @property def description(self) -> str: @@ -31,9 +36,10 @@ class SimpleEdge(object): __slots__ = ["s", "t", "length", "name"] def __init__(self, s: int, t: int, length: float) -> None: - self.s, self.t = s, t - self.length = length - self.name = "" + self.s: int = s + self.t: int = t + self.length: float = length + self.name: str = "" @property def forward(self) -> bool: @@ -46,3 +52,7 @@ def backward(self) -> bool: @property def description(self) -> str: return "{} {} {}".format(self.s, self.t, self.length) + + +EdgeType = Union[Edge, SimpleEdge] +VertexType = Union[Vertex] diff --git a/graph/graphfactory.py b/graph/graphfactory.py index b5c7db2..7670280 100644 --- a/graph/graphfactory.py +++ b/graph/graphfactory.py @@ -1,7 +1,7 @@ import copy import graph.graph as graph -import graph.graph_types as graph_types +from graph.graph_types import Vertex, Edge import utils.geo_tools as geo_tools import utils.timer as timer @@ -18,7 +18,7 @@ def build_graph_from_osm(nodes, ways): node_ids = nodes.keys() id_mapper = dict(zip(node_ids, range(len(node_ids)))) for n in nodes.values(): - g.add_node(graph_types.Vertex(id_mapper[n.osm_id], n.lat, n.lon)) + g.add_node(Vertex(id_mapper[n.osm_id], n.lat, n.lon)) # 2. go through all ways and add edges accordingly for w in ways: @@ -26,7 +26,7 @@ def build_graph_from_osm(nodes, ways): s_id, t_id = id_mapper[w.nodes[i]], id_mapper[w.nodes[i + 1]] s, t = g.vertices[s_id], g.vertices[t_id] length = geo_tools.distance(s.lat, s.lon, t.lat, t.lon) - edge = graph_types.Edge(s_id, t_id, length, w.highway, w.max_speed, w.forward, w.backward, w.name) + edge = Edge(s_id, t_id, length, w.highway, w.max_speed, w.forward, w.backward, w.name) g.add_edge(edge) return g @@ -40,7 +40,7 @@ def build_graph_from_vertices_edges(vertices, edges): vertex_ids = set([v.id for v in vertices]) id_mapper = dict(zip(vertex_ids, range(len(vertex_ids)))) for v in vertices: - g.add_node(graph_types.Vertex(id_mapper[v.id], v.lat, v.lon)) + g.add_node(Vertex(id_mapper[v.id], v.lat, v.lon)) # 2. add all edges that are valid new_edges = [copy.deepcopy(e) for e in edges if e.s in vertex_ids and e.t in vertex_ids] diff --git a/osm/osm_types.py b/osm/osm_types.py index bf158bb..de33822 100644 --- a/osm/osm_types.py +++ b/osm/osm_types.py @@ -1,5 +1,4 @@ -# -*- coding: utf-8 -*- -from typing import Optional +from typing import List, Optional class OSMWay: @@ -7,11 +6,11 @@ class OSMWay: def __init__(self, osm_id: int) -> None: self.osm_id = osm_id - self.nodes = [] - self.highway = None - self.area = None - self.max_speed = None - self.direction = None + self.nodes: List[int] = [] + self.highway: Optional[str] = None + self.area: Optional[str] = None + self.max_speed: Optional[str] = None + self.direction: Optional[str] = None self.forward = True self.backward = True self.name = "" @@ -23,6 +22,7 @@ def add_node(self, osm_id: int) -> None: class OSMNode(object): __slots__ = ["lat", "lon", "osm_id"] - def __init__(self, osm_id: int, lat: Optional[float] = None, lon: Optional[float] = None) -> None: - self.osm_id = osm_id - self.lat, self.lon = lat, lon + def __init__(self, osm_id: int, lat: float, lon: float) -> None: + self.osm_id: int = osm_id + self.lat: float = lat + self.lon: float = lon diff --git a/osm/way_parser_helper.py b/osm/way_parser_helper.py index 94ecb34..ad476a5 100644 --- a/osm/way_parser_helper.py +++ b/osm/way_parser_helper.py @@ -1,9 +1,12 @@ +from osm.osm_types import OSMWay + + class WayParserHelper: def __init__(self, config): self.config = config - def is_way_acceptable(self, way): + def is_way_acceptable(self, way: OSMWay): # reject: if the way marks the border of an area if way.area == 'yes': return False diff --git a/osm/xml_handler.py b/osm/xml_handler.py index 60995a3..856bfb5 100644 --- a/osm/xml_handler.py +++ b/osm/xml_handler.py @@ -1,16 +1,13 @@ import os import sys -import xml.sax +from typing import Optional, Set, List, Dict +from xml.sax.xmlreader import AttributesImpl +from xml.sax.handler import ContentHandler from osm.osm_types import OSMWay, OSMNode - from osm.way_parser_helper import WayParserHelper -from typing import Optional, Set -from xml.sax.xmlreader import AttributesImpl -try: - intern = sys.intern -except AttributeError: - pass + +intern = sys.intern class PercentageFile(object): @@ -45,77 +42,77 @@ def percentage(self) -> float: return float(self.delivered) / self.size * 100.0 -class NodeHandler(xml.sax.ContentHandler): +class NodeHandler(ContentHandler): def __init__(self, found_nodes: Set[int]) -> None: - self.found_nodes = found_nodes - self.nodes = {} + self.found_nodes: Set[int] = found_nodes + self.nodes: Dict[int, OSMNode] = {} - def startElement(self, tag: str, attributes: AttributesImpl) -> None: - if tag == "node": - osm_id = int(attributes["id"]) + def startElement(self, name: str, attrs: AttributesImpl) -> None: + if name == "node": + osm_id = int(attrs["id"]) if osm_id not in self.found_nodes: return - self.nodes[osm_id] = OSMNode(osm_id, float(attributes["lat"]), float(attributes["lon"])) + self.nodes[osm_id] = OSMNode(osm_id, float(attrs["lat"]), float(attrs["lon"])) -class WayHandler(xml.sax.ContentHandler): +class WayHandler(ContentHandler): def __init__(self, parser_helper: WayParserHelper) -> None: - # stores all found ways - self.found_ways = [] - self.found_nodes = set() + self.found_ways: List[OSMWay] = [] + self.found_nodes: Set[int] = set() - self.start_tag_found = False - self.current_way = None + self.current_way: Optional[OSMWay] = None self.parser_helper = parser_helper + print(self.current_way) + print(self.current_way is not None) - def startElement(self, tag: str, attributes: AttributesImpl) -> None: - if tag == "way": - self.start_tag_found = True - self.current_way = OSMWay(int(attributes["id"])) + def startElement(self, name: str, attrs: AttributesImpl) -> None: + if name == "way": + self.current_way = OSMWay(osm_id=int(attrs["id"])) return - if self.start_tag_found: + if self.current_way is not None: try: - if tag == "nd": - # gather nodes - node_id = int(attributes["ref"]) + if name == "nd": + node_id = int(attrs["ref"]) self.current_way.add_node(node_id) - elif tag == "tag": - if attributes["k"] == "highway": - self.current_way.highway = attributes["v"] - elif attributes["k"] == "area": - self.current_way.area = attributes["v"] - elif attributes["k"] == "maxspeed": - self.current_way.max_speed = str(attributes["v"]) - elif attributes["k"] == "oneway": - if attributes["v"] == "yes": + elif name == "tag": + if attrs["k"] == "highway": + print(self.current_way) + print(attrs["v"]) + self.current_way.highway = attrs["v"] + elif attrs["k"] == "area": + self.current_way.area = attrs["v"] + elif attrs["k"] == "maxspeed": + self.current_way.max_speed = str(attrs["v"]) + elif attrs["k"] == "oneway": + if attrs["v"] == "yes": self.current_way.direction = "oneway" - elif attributes["k"] == "name": + elif attrs["k"] == "name": try: - self.current_way.name = intern(attributes["v"]) + self.current_way.name = intern(attrs["v"]) except TypeError: - self.current_way.name = attributes["v"] - elif attributes["k"] == "junction": - if attributes["v"] == "roundabout": + self.current_way.name = attrs["v"] + elif attrs["k"] == "junction": + if attrs["v"] == "roundabout": self.current_way.direction = "oneway" - elif attributes["k"] == "indoor": + elif attrs["k"] == "indoor": # this is not an ideal solution since it sets the pedestrian flag irrespective of the real value in osm data # but aims to cover the simple indoor tagging approach: https://wiki.openstreetmap.org/wiki/Simple_Indoor_Tagging # more info: https://help.openstreetmap.org/questions/61025/pragmatic-single-level-indoor-paths - if attributes["v"] == "corridor": + if attrs["v"] == "corridor": self.current_way.highway = "pedestrian_indoor" except: e = sys.exc_info()[0] print("Error while parsing: {}".format(e)) - def endElement(self, tag: str) -> None: - if tag == "way": - self.start_tag_found = False + def endElement(self, name: str) -> None: + if name == "way": + assert self.current_way is not None if not self.parser_helper.is_way_acceptable(self.current_way): self.current_way = None