import concurrent.futures
from typing import Any
import geopandas as gpd
import networkx as nx
import pandas as pd
from shapely import LineString, MultiPolygon, Point, Polygon
from shapely.ops import substring
from iduedu import config
from iduedu.modules.drive_walk_builders import get_walk_graph
from iduedu.modules.graph_transformers import keep_largest_strongly_connected_component
from iduedu.modules.overpass_downloaders import get_4326_boundary
from iduedu.modules.public_transport_builders import get_all_public_transport_graph
logger = config.logger
[docs]
def join_pt_walk_graph(
public_transport_g: nx.Graph,
walk_g: nx.Graph,
max_dist=20,
keep_largest_subgraph: bool = True,
) -> nx.Graph:
"""
Join a public-transport graph with a pedestrian graph into a single intermodal network.
The function relabels node ids to avoid collisions, finds PT platforms (including subway
entries/exits), projects them onto the nearest walking edges within `max_dist`, and connects
the two networks. When multiple platforms project to the same point on a walk edge, PT nodes
are merged. For each platform–edge match, the walking edge is split at the projection point
(or its endpoint is replaced by the platform node), and new directed walk edges are created
with length/time attributes. Finally, both graphs are composed and (optionally) pruned to
the largest strongly connected component.
Parameters:
public_transport_g (nx.Graph): PT graph with node attributes `x`, `y`, `type`, `route`
and a graph CRS in `graph["crs"]`. Platform-like nodes are those with
`type in {"platform","subway_entry_exit","subway_entry","subway_exit"}`.
walk_g (nx.Graph): Pedestrian graph in the **same** CRS with edge `geometry` and graph
attribute `graph["walk_speed"]` (meters/min). If `walk_speed` is missing, a default
of 83.33 m/min is used.
max_dist (float): Max search radius in meters to snap platforms to walk edges.
keep_largest_subgraph (bool): If True, keep only the largest strongly connected component.
Returns:
(nx.Graph): Intermodal `MultiDiGraph`:
- nodes: union of PT and Walk nodes (platforms become vertices of the walk network);
- edges: original PT + split/updated Walk edges with `geometry`, `length_meter`,
`time_min`, `type="walk"` (for inserted walking segments).
Graph attrs: `graph["type"]="intermodal"`; CRS is inherited from inputs.
Notes:
- Node ids of PT and Walk parts are relabeled to disjoint ranges before composition,
then relabeled densely (0..N-1) in the final graph.
- Platforms projecting exactly to a walk edge endpoint trigger in-place node relabeling
of that endpoint to the platform id (no edge splitting needed).
- Duplicate projection points along the same walk edge are detected; PT platforms are merged
and their incident PT edges are rewired onto the chosen representative.
"""
if public_transport_g.graph["crs"] != walk_g.graph["crs"]:
raise AttributeError(
f"CRS mismatching, public_transport_graph crs {public_transport_g.graph['crs']}, walk_graph crs {walk_g.graph['crs']}"
)
logger.info("Composing intermodal graph...")
num_nodes_g1 = len(public_transport_g.nodes)
mapping_g1 = {node: idx for idx, node in enumerate(public_transport_g.nodes)}
mapping_g2 = {node: idx + num_nodes_g1 for idx, node in enumerate(walk_g.nodes)}
transport: nx.DiGraph = nx.relabel_nodes(public_transport_g, mapping_g1)
walk: nx.MultiDiGraph = nx.relabel_nodes(walk_g, mapping_g2)
platforms = pd.DataFrame.from_dict(dict(transport.nodes(data=True)), orient="index")
platforms = platforms[platforms["type"].isin(["platform", "subway_entry_exit", "subway_entry", "subway_exit"])]
platforms["geometry"] = gpd.points_from_xy(platforms["x"], platforms["y"])
platforms = gpd.GeoDataFrame(platforms, crs=transport.graph["crs"])
logger.debug("creating walk edges gdf")
walk_edges = gpd.GeoDataFrame(
nx.to_pandas_edgelist(walk.to_undirected(), source="u", target="v", edge_key="k"), crs=walk.graph["crs"]
)
walk_edges["edge_geometry"] = walk_edges["geometry"]
logger.debug("joining nearest")
projection_join = platforms.reset_index().sjoin_nearest(walk_edges, max_distance=max_dist, distance_col="dist")
projection_join["project_dist"] = projection_join["edge_geometry"].project(projection_join["geometry"])
projection_join["project_point"] = gpd.GeoSeries(
projection_join["edge_geometry"].interpolate(projection_join["project_dist"]), crs=walk.graph["crs"]
).set_precision(1)
projection_join["route"] = projection_join["route"].apply(lambda x: x if isinstance(x, list) else [x])
logger.debug("searching for duplicated project points")
projection_join = projection_join.groupby(by="project_point", as_index=False).agg(
{
"index": lambda x: list(dict.fromkeys(item for item in x)),
"index_right": "first",
"geometry": "first",
"u": "first",
"v": "first",
"k": "first",
"project_dist": "first",
"route": lambda x: list(dict.fromkeys(item for sublist in x for item in sublist)),
}
)
for ind, row in projection_join.iterrows():
if len(row["index"]) == 1:
projection_join.loc[ind, "index"] = row["index"][0]
platform_in = row["index"][0]
for platform_out in row["index"][1:]:
if platform_out == platform_in:
continue
for neighbor in transport.neighbors(platform_out):
edge_data = transport.get_edge_data(neighbor, platform_out)
if edge_data:
transport.add_edge(neighbor, platform_in, **edge_data)
edge_data = transport.get_edge_data(platform_out, neighbor)
if edge_data:
transport.add_edge(platform_in, neighbor, **edge_data)
transport.remove_node(platform_out)
transport.nodes[platform_in]["route"] = row["route"]
projection_join.loc[ind, "index"] = platform_in
logger.debug("Projecting platforms to walk graph")
points_grouped_by_edge = projection_join.groupby(by="index_right", as_index=False).agg(
{
"index": tuple,
"geometry": tuple,
"u": "first",
"v": "first",
"k": "first",
}
)
try:
speed = walk.graph["walk_speed"]
except KeyError:
logger.warning(
"There is no walk_speed in graph, set to the default speed - 83.33 m/min"
) # посчитать примерную скорость по length timemin для любой эджи
speed = 83.33
edges_to_del = []
# for name, row in points_grouped_by_edge.iterrows():
for i in range(len(points_grouped_by_edge)):
row = points_grouped_by_edge.iloc[i]
u, v, k = row[["u", "v", "k"]]
edge: LineString = walk.get_edge_data(u, v, k)["geometry"]
# Если платформа одна единственная на эдж
if len(row["index"]) == 1:
platform_id = row["index"][0]
dist = edge.project(row.geometry[0])
projected_point = edge.interpolate(dist)
if dist == 0:
# Если платформа проецируется на начало эджа
mapping = {u: platform_id}
nx.relabel_nodes(walk, mapping, copy=False)
points_grouped_by_edge.loc[points_grouped_by_edge["u"] == u, "u"] = platform_id
points_grouped_by_edge.loc[points_grouped_by_edge["v"] == u, "v"] = platform_id
elif dist == edge.length:
# Если на конец
mapping = {v: platform_id}
nx.relabel_nodes(walk, mapping, copy=False)
points_grouped_by_edge.loc[points_grouped_by_edge["u"] == v, "u"] = platform_id
points_grouped_by_edge.loc[points_grouped_by_edge["v"] == v, "v"] = platform_id
else:
line1 = substring(edge, 0, dist)
line2 = substring(edge, dist, edge.length)
# Убираем старые эджи и добавляем новые, в серединке нода - наша платформа для соединения
edges_to_del.append((u, v, k))
if u != v:
edges_to_del.append((v, u, k))
walk.add_node(platform_id, x=round(projected_point.x, 5), y=round(projected_point.y, 5))
walk.add_edge(
u,
platform_id,
geometry=line1,
length_meter=round(line1.length, 3),
time_min=round(line1.length / speed, 3),
type="walk",
)
walk.add_edge(
platform_id,
u,
geometry=line1.reverse(),
length_meter=round(line1.length, 3),
time_min=round(line1.length / speed, 3),
type="walk",
)
walk.add_edge(
platform_id,
v,
geometry=line2,
length_meter=round(line2.length, 3),
time_min=round(line2.length / speed, 3),
type="walk",
)
walk.add_edge(
v,
platform_id,
geometry=line2.reverse(),
length_meter=round(line2.length, 3),
time_min=round(line2.length / speed, 3),
type="walk",
)
# Если платформм несколько на эдж
else:
dist_project = [
(edge.project(platform), edge.interpolate(edge.project(platform)), ind)
for ind, platform in zip(row["index"], row.geometry)
]
dist_project.sort(key=lambda x: x[0])
u_to_del = u
v_to_del = v
last_dist = 0
last_u = u
for dist, projected_point, cur_index in dist_project:
if dist == 0:
# Если платформа проецируется на начало эджа
mapping = {u: cur_index}
u_to_del = cur_index
nx.relabel_nodes(walk, mapping, copy=False)
points_grouped_by_edge.loc[points_grouped_by_edge["u"] == u, "u"] = cur_index
points_grouped_by_edge.loc[points_grouped_by_edge["v"] == u, "v"] = cur_index
elif dist == edge.length:
# Если на конец
mapping = {v: cur_index}
v_to_del = cur_index
nx.relabel_nodes(walk, mapping, copy=False)
points_grouped_by_edge.loc[points_grouped_by_edge["u"] == v, "u"] = cur_index
points_grouped_by_edge.loc[points_grouped_by_edge["v"] == v, "v"] = cur_index
else:
# По очереди добавляем ноды/линии
line = substring(edge, last_dist, dist)
if isinstance(line, Point):
raise ValueError(f"wtf {line} cannot be linestring")
walk.add_node(cur_index, x=round(projected_point.x, 5), y=round(projected_point.y, 5))
walk.add_edge(
last_u,
cur_index,
geometry=line,
length_meter=round(line.length, 3),
time_min=round(line.length / speed, 3),
type="walk",
)
walk.add_edge(
cur_index,
last_u,
geometry=line.reverse(),
length_meter=round(line.length, 3),
time_min=round(line.length / speed, 3),
type="walk",
)
last_u = cur_index
last_dist = dist
# Если последняя остановка спроецировалась не на конец эджа, надо добавить остаток
if last_dist != edge.length:
line = substring(edge, last_dist, edge.length)
walk.add_edge(
last_u,
v,
geometry=line,
length_meter=round(line.length, 3),
time_min=round(line.length / speed, 3),
type="walk",
)
walk.add_edge(
v,
last_u,
geometry=line.reverse(),
length_meter=round(line.length, 3),
time_min=round(line.length / speed, 3),
type="walk",
)
edges_to_del.append((u_to_del, v_to_del, k))
if u != v:
edges_to_del.append((v_to_del, u_to_del, k))
walk.remove_edges_from(edges_to_del)
logger.debug("Composing graphs")
intermodal = nx.compose(nx.MultiDiGraph(transport), nx.MultiDiGraph(walk))
if keep_largest_subgraph:
intermodal = keep_largest_strongly_connected_component(intermodal)
intermodal.remove_nodes_from([node for node, data in intermodal.nodes(data=True) if "x" not in data.keys()])
mapping = {old_label: new_label for new_label, old_label in enumerate(intermodal.nodes())}
logger.debug("Relabeling")
nx.relabel_nodes(intermodal, mapping, copy=False)
intermodal.graph["type"] = "intermodal"
logger.debug("Done composing!")
return intermodal
[docs]
def get_intermodal_graph(
*,
osm_id: int | None = None,
territory: Polygon | MultiPolygon | gpd.GeoDataFrame | None = None,
clip_by_territory: bool = False,
keep_edge_geometry: bool = True,
osm_edge_tags: list[str] | None = None,
max_dist: float = 30,
keep_largest_subgraph: bool = True,
walk_kwargs: dict[str, Any] | None = None,
pt_kwargs: dict[str, Any] | None = None,
) -> nx.Graph:
"""
Build an intermodal (PT+walking) graph for a territory by downloading, parsing, and joining both networks.
The function resolves a boundary polygon (by `osm_id` or `territory`), runs **in parallel**:
1) pedestrian network construction (`get_walk_graph`),
2) public-transport network construction for selected modes (`get_all_public_transport_graph`),
then connects PT platforms to nearby walk edges via `join_pt_walk_graph` using a snapping radius `max_dist`.
Edge lengths (m) and times (min) come from the underlying builders and from the walk-edge splits.
Parameters:
osm_id (int | None): OSM relation/area id for the territory; provide this or `territory`.
territory (Polygon | MultiPolygon | gpd.GeoDataFrame | None): Boundary geometry in EPSG:4326.
clip_by_territory (bool): If True, both PT and Walk graphs are clipped to the boundary.
keep_edge_geometry (bool): If True, keep `shapely` geometries on edges for both sub-graphs.
osm_edge_tags (list[str] | None): Subset of OSM tags to retain (forwarded to both builders).
max_dist (float): Max distance in meters to connect PT platforms to walk edges.
keep_largest_subgraph (bool): If True, keep only the largest strongly connected component after joining.
walk_kwargs (dict[str, Any] | None): Extra keyword args for `get_walk_graph` (e.g., `walk_speed`,
`simplify`, `osm_edge_tags`, `keep_largest_subgraph`, …). Defaults are sensible.
pt_kwargs (dict[str, Any] | None): Extra keyword args for `get_all_public_transport_graph`
(e.g., `transport_types`, `osm_edge_tags`, `keep_edge_geometry`, …).
Returns:
(nx.Graph): Intermodal `MultiDiGraph` combining public transport and pedestrian networks, with:
- node attrs: `x`, `y` (local CRS), plus PT metadata for platform/station nodes where present;
- edge attrs: `type` (e.g., "walk", PT edge types), `length_meter`, `time_min`, optional `geometry`,
and selected OSM tags.
Graph CRS equals the builders’ local projected CRS.
Notes:
- `keep_edge_geometry`, `clip_by_territory`, and `osm_edge_tags` are propagated to both builders unless
overridden in `walk_kwargs`/`pt_kwargs`.
- If the PT graph is empty for the area, the function returns the walking graph alone (with a warning).
- Joining requires both sub-graphs to share the same CRS; builders derive a **local projected CRS**
from the boundary’s extent, so lengths/times are in meters/minutes.
"""
boundary = get_4326_boundary(osm_id=osm_id, territory=territory)
walk_kwargs = dict(walk_kwargs or {})
pt_kwargs = dict(pt_kwargs or {})
for d in (walk_kwargs, pt_kwargs):
d.setdefault("keep_edge_geometry", keep_edge_geometry)
if osm_edge_tags is not None:
d.setdefault("osm_edge_tags", osm_edge_tags)
d.setdefault("clip_by_territory", clip_by_territory)
walk_kwargs.setdefault("keep_largest_subgraph", keep_largest_subgraph)
with concurrent.futures.ThreadPoolExecutor() as executor:
walk_future = executor.submit(get_walk_graph, territory=boundary, **walk_kwargs)
logger.debug("Started downloading and parsing walk graph...")
pt_future = executor.submit(get_all_public_transport_graph, territory=boundary, **pt_kwargs)
logger.debug("Started downloading and parsing public transport graph...")
pt_g = pt_future.result()
logger.debug("Public transport graph done!")
walk_g = walk_future.result()
logger.debug("Walk graph done!")
if len(pt_g.nodes()) == 0:
logger.warning("Public transport graph is empty! Returning only walk graph.")
return walk_g
intermodal = join_pt_walk_graph(pt_g, walk_g, max_dist=max_dist, keep_largest_subgraph=keep_largest_subgraph)
return intermodal