Source code for objectnat.methods.coverage_zones.stepped_coverage

from typing import Literal

import geopandas as gpd
import networkx as nx
import numpy as np
import pandas as pd
from pyproj.exceptions import CRSError
from shapely import Point, concave_hull

from objectnat.methods.isochrones.isochrone_utils import create_separated_dist_polygons
from objectnat.methods.utils.graph_utils import get_closest_nodes_from_gdf, reverse_graph


[docs] def get_stepped_graph_coverage( gdf_to: gpd.GeoDataFrame, nx_graph: nx.Graph, weight_type: Literal["time_min", "length_meter"], step_type: Literal["voronoi", "separate"], weight_value_cutoff: float = None, zone: gpd.GeoDataFrame = None, step: float = None, ): """ Calculate stepped coverage zones from source points through a graph network using Dijkstra's algorithm and Voronoi-based or buffer-based isochrone steps. This function combines graph-based accessibility with stepped isochrone logic. It: 1. Finds nearest graph nodes for each input point 2. Computes reachability for increasing weights (e.g. time or distance) in defined steps 3. Generates Voronoi-based or separate buffer zones around network nodes 4. Aggregates zones into stepped coverage layers 5. Optionally clips results to a boundary zone Args: gdf_to (gpd.GeoDataFrame): Source points from which stepped coverage is calculated. nx_graph (nx.Graph): NetworkX graph representing the transportation network. weight_type: Type of edge weight to use for path calculation: - ``"time_min"``: Edge travel time in minutes - ``"length_meter"``: Edge length in meters step_type: Method for generating stepped zones: - ``"voronoi"``: Stepped zones based on Voronoi polygons around graph nodes - ``"separate"``: Independent buffer zones per step weight_value_cutoff (float, optional): Maximum weight value (e.g., max travel time or distance) to limit the coverage extent. zone (gpd.GeoDataFrame, optional): Optional boundary polygon to clip resulting stepped zones. If None, concave hull of reachable area is used. step (float, optional): Step interval for coverage zone construction. Defaults to: - 100 meters for distance-based weight - 1 minute for time-based weight Returns: gpd.GeoDataFrame: GeoDataFrame with polygons representing stepped coverage zones for each input point, annotated by step range. Notes: - Input graph must have a valid CRS defined. - MultiGraph or MultiDiGraph inputs will be simplified to Graph/DiGraph. - Designed for accessibility and spatial equity analyses over multimodal networks. """ if step is None: if weight_type == "length_meter": step = 100 else: step = 1 original_crs = gdf_to.crs try: local_crs = nx_graph.graph["crs"] except KeyError as exc: raise ValueError("Graph does not have crs attribute") from exc try: points = gdf_to.copy() points.to_crs(local_crs, inplace=True) except CRSError as e: raise CRSError(f"Graph crs ({local_crs}) has invalid format.") from e nx_graph, reversed_graph = reverse_graph(nx_graph, weight_type) points.geometry = points.representative_point() distances, nearest_nodes = get_closest_nodes_from_gdf(points, nx_graph) points["nearest_node"] = nearest_nodes points["distance"] = distances dist = nx.multi_source_dijkstra_path_length( reversed_graph, nearest_nodes, weight=weight_type, cutoff=weight_value_cutoff ) graph_points = pd.DataFrame( data=[{"node": node, "geometry": Point(data["x"], data["y"])} for node, data in nx_graph.nodes(data=True)] ) nearest_nodes = pd.DataFrame.from_dict(dist, orient="index", columns=["dist"]).reset_index() graph_nodes_gdf = gpd.GeoDataFrame( graph_points.merge(nearest_nodes, left_on="node", right_on="index", how="left").reset_index(drop=True), geometry="geometry", crs=local_crs, ) graph_nodes_gdf.drop(columns=["index", "node"], inplace=True) if weight_value_cutoff is None: weight_value_cutoff = max(nearest_nodes["dist"]) if step_type == "voronoi": graph_nodes_gdf["dist"] = np.minimum(np.ceil(graph_nodes_gdf["dist"] / step) * step, weight_value_cutoff) voronois = gpd.GeoDataFrame(geometry=graph_nodes_gdf.voronoi_polygons(), crs=local_crs) zone_coverages = voronois.sjoin(graph_nodes_gdf).dissolve(by="dist", as_index=False, dropna=False) zone_coverages = zone_coverages[["dist", "geometry"]].explode(ignore_index=True) if zone is None: zone = concave_hull(graph_nodes_gdf[~graph_nodes_gdf["node_to"].isna()].union_all(), ratio=0.5) else: zone = zone.to_crs(local_crs) zone_coverages = zone_coverages.clip(zone).to_crs(original_crs) else: # step_type == 'separate': speed = 83.33 # TODO HARDCODED WALK SPEED weight_value = weight_value_cutoff zone_coverages = create_separated_dist_polygons(graph_nodes_gdf, weight_value, weight_type, step, speed) if zone is not None: zone = zone.to_crs(local_crs) zone_coverages = zone_coverages.clip(zone).to_crs(original_crs) return zone_coverages