from typing import Literal
import geopandas as gpd
import networkx as nx
import numpy as np
import pandas as pd
from geopandas import GeoDataFrame
from pyproj import CRS
from shapely import LineString, MultiLineString, Polygon, line_merge
from shapely.geometry.multipolygon import MultiPolygon
from iduedu import config
from iduedu.enums.highway_enums import HighwayType
from iduedu.enums.network_enums import Network
from iduedu.modules.graph_transformers import estimate_crs_for_bounds, keep_largest_strongly_connected_component
from iduedu.modules.overpass_downloaders import get_4326_boundary, get_network_by_filters
logger = config.logger
def _get_highway_properties(highway) -> tuple[str, float]:
"""
Map OSM `highway` class(es) to a regulatory category and a default speed.
Accepts either a single string or a list of classes. When multiple classes are present,
picks the lowest (most permissive) regulatory status and the minimum default speed among them.
Parameters:
highway (str | list[str]): OSM highway class or list of classes (e.g., "primary", "residential", ...).
Returns:
(tuple[str, float]): A pair `(category, maxspeed_mpm)` where:
- `category` (str): regulatory class label, e.g., "local" | "regional" | "federal".
- `maxspeed_mpm` (float): default speed for that class in **meters per minute**.
Notes:
If the class is unknown, returns defaults for `UNCLASSIFIED`.
"""
default = (HighwayType.UNCLASSIFIED.reg_status, HighwayType.UNCLASSIFIED.max_speed)
if not highway:
return default
highway_list = highway if isinstance(highway, list) else [highway]
reg_values, speed_values = [], []
for ht in highway_list:
try:
enum_value = HighwayType[ht.upper()]
except KeyError:
continue
reg_values.append(enum_value.reg_status)
speed_values.append(enum_value.max_speed)
if not reg_values or not speed_values:
return default
rank = {"local": 0, "regional": 1, "federal": 2}
lowest_reg = min(reg_values, key=lambda s: rank.get(s, 999))
min_speed = min(speed_values) if speed_values else default[1]
return lowest_reg, min_speed
def _build_edges_from_overpass(polygon: Polygon, way_filter: str, simplify: bool = True) -> tuple[GeoDataFrame, CRS]:
"""
Download OSM ways by filter, segment into edges, and project to a local CRS.
The function queries the ways within `polygon` using an Overpass `way_filter`, converts each way
into consecutive line segments (one edge per segment), and projects the result to a local metric CRS.
If `simplify=True`, contiguous segments are merged (`line_merge`), and OSM attributes are transferred
to merged lines via midpoint nearest join.
Parameters:
polygon (Polygon): Boundary polygon in EPSG:4326 used to query OSM data.
way_filter (str): Overpass filter applied to ways (e.g., `[ "highway" ~ "motorway|trunk|primary|..." ]`).
simplify (bool): If True, merges contiguous segments into longer edges and reattaches attributes
using a nearest midpoint join.
Returns:
(tuple[gpd.GeoDataFrame, CRS]): A pair `(edges, local_crs)` where:
- `edges` (GeoDataFrame): Line features in local CRS with columns:
`geometry` (LineString), `way_idx` (source way index), `id` (OSM way id), `tags` (dict).
- `local_crs` (pyproj.CRS): Estimated local projected CRS suitable for metric length computations.
Notes:
Attributes on merged edges are inferred from the nearest original segment around the midpoint,
which may drop or aggregate original per-segment variability.
"""
data = get_network_by_filters(polygon, way_filter)
ways = data[data["type"] == "way"].copy()
# Собираем координаты каждой линии (lon, lat)
coords_list = [np.asarray([(p["lon"], p["lat"]) for p in pts], dtype="f8") for pts in ways["geometry"].values]
# сегментация на отрезки
starts = np.concatenate([a[:-1] for a in coords_list], axis=0)
ends = np.concatenate([a[1:] for a in coords_list], axis=0)
lengths = np.array([a.shape[0] for a in coords_list], dtype=int)
seg_counts = np.maximum(lengths - 1, 0)
way_idx = np.repeat(ways.index.values, seg_counts)
geoms = [LineString([tuple(s), tuple(e)]) for s, e in zip(starts, ends)]
# локальная проекция
local_crs = estimate_crs_for_bounds(*polygon.bounds)
edges = gpd.GeoDataFrame({"way_idx": way_idx}, geometry=geoms, crs=4326).to_crs(local_crs)
edges = edges.join(ways[["id", "tags"]], on="way_idx")
if simplify:
# сшиваем ребра и переносим атрибуты через midpoints -> nearest
merged = list(
line_merge(MultiLineString(edges.geometry.to_list()), directed=True).geoms
) # TODO Можно быстрее, если разьединить по тегам и распараллелить, тоже самое в DRIVE
lines = gpd.GeoDataFrame(geometry=merged, crs=local_crs)
mid = lines.copy()
mid.geometry = mid.interpolate(lines.length / 2)
# TODO Вот тут подумать надо, теряются аттрибуты, надо ли складывать их "по умному", в листы.
joined = gpd.sjoin_nearest(mid[["geometry"]], edges, how="left", max_distance=1)
joined = joined.reset_index().drop_duplicates(subset="index").set_index("index")
lines = lines.join(joined[["tags", "id", "way_idx"]])
edges = lines
return edges, local_crs
[docs]
def get_drive_graph(
*,
osm_id: int | None = None,
territory: Polygon | MultiPolygon | gpd.GeoDataFrame | None = None,
simplify: bool = True,
add_road_category: bool = True,
clip_by_territory: bool = False,
keep_largest_subgraph: bool = True,
network_type: Literal["drive", "drive_service", "custom"] = "drive",
custom_filter: str | None = None,
osm_edge_tags: list[str] | None = None, # overrides default tags
keep_edge_geometry: bool = True,
) -> nx.MultiDiGraph:
"""
Build a drivable road network (nx.MultiDiGraph) from OpenStreetMap within a given territory.
The function downloads OSM ways via Overpass, segments them into directed edges, optionally merges
contiguous segments, duplicates two-way streets in reverse, and computes per-edge length (meters)
and travel time (minutes). Node coordinates are unique line endpoints in a local projected CRS.
Edge attributes can include selected OSM tags and a derived road category/speed.
Parameters:
osm_id (int | None): OSM relation/area ID of the territory boundary. Provide this or `territory`.
territory (Polygon | MultiPolygon | gpd.GeoDataFrame | None): Boundary geometry in EPSG:4326 or GeoDataFrame.
Used when `osm_id` is not given.
simplify (bool): If True, merges contiguous collinear segments and transfers attributes
back to merged lines using nearest midpoints. If False, keeps raw per-segment edges.
add_road_category (bool): If True, adds a derived `category` (e.g., local/regional/federal) and a default
speed (`maxspeed_mpm`) inferred from the OSM `highway` class.
clip_by_territory (bool): If True, clips edges by the exact boundary geometry before graph construction.
keep_largest_subgraph (bool): If True, returns only the largest strongly connected component.
network_type (Literal["drive","drive_service","custom"]): Preset of Overpass filters to select drivable ways.
Use "custom" together with `custom_filter` to pass your own Overpass `way` filter.
custom_filter (str | None): Custom Overpass filter (e.g., `["highway"~"motorway|trunk|…"]`) used when
`network_type="custom"`.
osm_edge_tags (list[str] | None): Which OSM tags to retain on edges. Overrides defaults from config.
The tags `oneway`, `maxspeed`, and `highway` are always added.
keep_edge_geometry (bool): If True, stores shapely geometries on edges (`geometry` attribute).
Returns:
(nx.MultiDiGraph): Directed multigraph of the road network. Each edge carries:
- `geometry` (if `keep_edge_geometry=True`) in local CRS,
- `length_meter` (float), `time_min` (float),
- `type="drive"`,
- selected OSM tags (incl. `highway`, `maxspeed`, `oneway`, optional `category`, etc.).
Graph-level attributes: `graph["crs"]` (local projected CRS), `graph["type"]` (network_type).
Raises:
ValueError: If `network_type` is unknown, or `network_type="custom"` without `custom_filter`.
"""
polygon4326 = get_4326_boundary(osm_id=osm_id, territory=territory)
filters = {
"drive": Network.DRIVE.filter,
"drive_service": Network.DRIVE_SERVICE.filter,
"custom": custom_filter,
}
try:
road_filter = filters[network_type]
except KeyError:
raise ValueError(f"Unknown road_type: {network_type!r}")
if network_type == "custom" and road_filter is None:
raise ValueError("For road_type='custom' you must provide custom_filter")
logger.info("Downloading drive network via Overpass ...")
edges, local_crs = _build_edges_from_overpass(polygon4326, road_filter, simplify=simplify)
if osm_edge_tags is None:
needed_tags = set(config.drive_useful_edges_attr)
else:
needed_tags = set(osm_edge_tags)
tags_to_retrieve = set(needed_tags) | {"oneway", "maxspeed", "highway"}
tags_df = pd.DataFrame.from_records(
({k: v for k, v in d.items() if k in tags_to_retrieve} for d in edges["tags"]),
index=edges.index,
)
edges = edges.join(tags_df)
if clip_by_territory:
clip_poly_gdf = gpd.GeoDataFrame(geometry=[polygon4326], crs=4326).to_crs(local_crs)
edges = edges.clip(clip_poly_gdf, keep_geom_type=True)
# двусторонние — дублируем с реверсом
two_way = edges[edges["oneway"] != "yes"].copy()
two_way.geometry = two_way.geometry.reverse()
edges = pd.concat([edges, two_way], ignore_index=True)
coords = edges.geometry.get_coordinates().to_numpy()
counts = edges.geometry.count_coordinates()
cuts = np.cumsum(counts)
first_idx = np.r_[0, cuts[:-1]]
last_idx = cuts - 1
starts = coords[first_idx]
ends = coords[last_idx]
edges["start"] = list(map(tuple, starts))
edges["end"] = list(map(tuple, ends))
all_endpoints = pd.Index(edges["start"]).append(pd.Index(edges["end"]))
labels, uniques = pd.factorize(all_endpoints)
n = len(edges)
u = labels[:n]
v = labels[n:]
edges["u"] = u
edges["v"] = v
edges[["category", "maxspeed_mpm"]] = edges["highway"].apply(lambda h: pd.Series(_get_highway_properties(h)))
maxspeed_osm_mpm = (pd.to_numeric(edges["maxspeed"], errors="coerce") * 1000.0 / 60.0).round(3)
edges["speed_mpm"] = maxspeed_osm_mpm.fillna(edges["maxspeed_mpm"])
edges["length_meter"] = edges.geometry.length.round(3)
edges["time_min"] = (edges["length_meter"] / edges["speed_mpm"]).round(3)
edges["type"] = "drive"
graph = nx.MultiDiGraph()
graph.add_nodes_from((i, {"x": float(x), "y": float(y)}) for i, (x, y) in enumerate(uniques))
if add_road_category:
needed_tags |= {"category"}
if keep_edge_geometry:
needed_tags |= {"geometry"}
needed_tags |= {"length_meter", "time_min", "type"}
edge_attr_cols = list(tag for tag in needed_tags if tag in edges.columns)
attrs_iter = edges[edge_attr_cols].to_dict("records")
graph.add_edges_from((int(uu), int(vv), d) for uu, vv, d in zip(u, v, attrs_iter))
if keep_largest_subgraph:
graph = keep_largest_strongly_connected_component(graph)
mapping = {old: new for new, old in enumerate(graph.nodes())}
graph = nx.relabel_nodes(graph, mapping)
graph.graph["crs"] = local_crs
graph.graph["type"] = network_type
logger.debug("Drive graph built.")
return graph
[docs]
def get_walk_graph(
*,
osm_id: int | None = None,
territory: Polygon | MultiPolygon | gpd.GeoDataFrame | None = None,
simplify: bool = True,
clip_by_territory: bool = False,
keep_largest_subgraph: bool = True,
walk_speed: float = 5 * 1000 / 60, # m/min
network_type: Literal["walk", "custom"] = "walk",
custom_filter: str | None = None,
osm_edge_tags: list[str] | None = None, # overrides default tags
keep_edge_geometry: bool = True,
) -> nx.MultiDiGraph:
"""
Build a pedestrian network (nx.MultiDiGraph) from OpenStreetMap within a given territory.
The function fetches OSM ways via Overpass using a walking filter, splits each way into directed
line segments, duplicates all segments in reverse, and computes per-edge length (meters) and traversal time
(minutes) using a given walking speed (m/min). Node coordinates are unique segment endpoints in a local
projected CRS. Selected OSM tags can be attached to edges.
Parameters:
osm_id (int | None): OSM relation/area ID for the boundary. Provide this or `territory`.
territory (Polygon | MultiPolygon | gpd.GeoDataFrame | None): Boundary geometry (EPSG:4326) or a GeoDataFrame
to define the area when `osm_id` is not given.
simplify (bool): If True, merges contiguous segments (via internal line merging) and transfers attributes
back to merged lines using nearest midpoints; if False, keeps raw per-segment edges.
clip_by_territory (bool): If True, clips edges by the provided boundary before graph construction.
keep_largest_subgraph (bool): If True, retains only the largest strongly connected component.
walk_speed (float): Walking speed in meters per minute used to compute `time_min` for each edge.
network_type (Literal["walk","custom"]): Preset of Overpass filters. Use "custom" together with `custom_filter`
to pass your own way filter.
custom_filter (str | None): Custom Overpass filter string used when `network_type="custom"`.
osm_edge_tags (list[str] | None): List of OSM edge tags to retain (overrides defaults). Only these keys
are joined from element tags.
keep_edge_geometry (bool): If True, stores shapely `geometry` on edges in the local projected CRS.
Returns:
(nx.MultiDiGraph): Directed multigraph of the walking network. Each edge carries:
- `geometry` (if `keep_edge_geometry=True`), local CRS,
- `length_meter` (float), `time_min` (float),
- `type="walk"`,
- selected OSM tags (as requested).
Graph attributes include: `graph["crs"]` (local projected CRS),
`graph["walk_speed"]` (float), and `graph["type"]` (network_type).
Raises:
ValueError: If `network_type` is unknown, or `network_type="custom"` without `custom_filter`.
Notes:
All walking edges are treated as bidirectional by duplicating geometries in reverse; u/v nodes
are assigned by factorizing unique segment endpoints. Lengths are measured in meters in a local
projected CRS estimated from the territory bounds.
"""
polygon4326 = get_4326_boundary(osm_id=osm_id, territory=territory)
filters = {
"walk": Network.WALK.filter,
"custom": custom_filter,
}
try:
road_filter = filters[network_type]
except KeyError:
raise ValueError(f"Unknown road_type: {network_type!r}")
if network_type == "custom" and road_filter is None:
raise ValueError("For road_type='custom' you must provide custom_filter")
logger.info("Downloading walk network via Overpass ...")
edges, local_crs = _build_edges_from_overpass(polygon4326, road_filter, simplify=simplify)
if osm_edge_tags is None:
needed_tags = set(config.walk_useful_edges_attr)
else:
needed_tags = set(osm_edge_tags)
tags_df = pd.DataFrame.from_records(
({k: v for k, v in d.items() if k in needed_tags} for d in edges["tags"]),
index=edges.index,
)
edges = edges.join(tags_df)
if clip_by_territory:
clip_poly_gdf = gpd.GeoDataFrame(geometry=[polygon4326], crs=4326).to_crs(local_crs)
edges = edges.clip(clip_poly_gdf, keep_geom_type=True).explode(ignore_index=True)
two_way = edges.copy()
two_way.geometry = two_way.geometry.reverse()
edges = pd.concat([edges, two_way], ignore_index=True)
# u/v и узлы
coords = edges.geometry.get_coordinates().to_numpy()
counts = edges.geometry.count_coordinates()
cuts = np.cumsum(counts)
first_idx = np.r_[0, cuts[:-1]]
last_idx = cuts - 1
starts = coords[first_idx]
ends = coords[last_idx]
edges["start"] = list(map(tuple, starts))
edges["end"] = list(map(tuple, ends))
all_endpoints = pd.Index(edges["start"]).append(pd.Index(edges["end"]))
labels, uniques = pd.factorize(all_endpoints)
n = len(edges)
u = labels[:n]
v = labels[n:]
edges["length_meter"] = edges.geometry.length.round(3)
edges["time_min"] = (edges["length_meter"] / float(walk_speed)).round(3)
edges["type"] = "walk"
# Сборка графа
graph = nx.MultiDiGraph()
graph.add_nodes_from((i, {"x": float(x), "y": float(y)}) for i, (x, y) in enumerate(uniques))
edge_attrs = set(needed_tags)
if keep_edge_geometry:
edge_attrs |= {"geometry"}
edge_attrs |= {"length_meter", "time_min", "type"}
attrs_iter = edges[list(edge_attrs)].to_dict("records")
graph.add_edges_from((int(uu), int(vv), d) for uu, vv, d in zip(u, v, attrs_iter))
if keep_largest_subgraph:
graph = keep_largest_strongly_connected_component(graph)
mapping = {old: new for new, old in enumerate(graph.nodes())}
graph = nx.relabel_nodes(graph, mapping)
graph.graph["crs"] = local_crs
graph.graph["walk_speed"] = float(walk_speed)
graph.graph["type"] = network_type
logger.debug("Walk graph built.")
return graph