Source code for objectnat.methods.isochrones.isochrones

from typing import Any, Literal

import geopandas as gpd
import networkx as nx
import numpy as np

from objectnat import config
from objectnat.methods.isochrones.isochrone_utils import (
    _calculate_distance_matrix,
    _create_isochrones_gdf,
    _prepare_graph_and_nodes,
    _process_pt_data,
    _validate_inputs,
    create_separated_dist_polygons,
)
from objectnat.methods.utils.geom_utils import remove_inner_geom
from objectnat.methods.utils.graph_utils import graph_to_gdf

logger = config.logger


[docs] def get_accessibility_isochrone_stepped( isochrone_type: Literal["radius", "ways", "separate"], point: gpd.GeoDataFrame, weight_value: float, weight_type: Literal["time_min", "length_meter"], nx_graph: nx.Graph, step: float = None, **kwargs: Any, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]: """ Calculate stepped accessibility isochrones for a single point with specified intervals. Args: isochrone_type: Visualization method for stepped isochrones: - ``"radius"``: Voronoi-based in circular buffers - ``"ways"``: Voronoi-based in road network polygons - ``"separate"``: Circular buffers for each step point (gpd.GeoDataFrame): Single source point for isochrone calculation (uses first geometry if multiple provided). weight_value (float): Maximum travel time (minutes) or distance (meters) threshold. weight_type: Type of weight calculation: - "time_min": Time-based in minutes - "length_meter": Distance-based in meters nx_graph (nx.Graph): NetworkX graph representing the transportation network. step (float, optional): Interval between isochrone steps. Defaults to: - 100 meters for distance-based - 1 minute for time-based **kwargs: Additional parameters: - buffer_factor: Size multiplier for buffers (default: 0.7) - road_buffer_size: Buffer size for road edges in meters (default: 5) Returns: tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]: Tuple containing: - stepped_isochrones: GeoDataFrame with stepped polygons and distance/time attributes - pt_stops: Public transport stops within isochrones (if available) - pt_routes: Public transport routes within isochrones (if available) """ buffer_params = { "buffer_factor": 0.7, "road_buffer_size": 5, } buffer_params.update(kwargs) original_crs = point.crs point = point.copy() if len(point) > 1: logger.warning( f"This method processes only single point. The GeoDataFrame contains {len(point)} points - " "only the first geometry will be used for isochrone calculation. " ) point = point.iloc[[0]] local_crs, graph_type = _validate_inputs(point, weight_value, weight_type, nx_graph) if step is None: if weight_type == "length_meter": step = 100 else: step = 1 nx_graph, points, dist_nearest, speed = _prepare_graph_and_nodes( point, nx_graph, graph_type, weight_type, weight_value ) dist_matrix, subgraph = _calculate_distance_matrix( nx_graph, points["nearest_node"].values, weight_type, weight_value, dist_nearest ) logger.info("Building isochrones geometry...") nodes, edges = graph_to_gdf(subgraph) nodes.loc[dist_matrix.columns, "dist"] = dist_matrix.iloc[0] if isochrone_type == "separate": stepped_iso = create_separated_dist_polygons(nodes, weight_value, weight_type, step, speed) else: if isochrone_type == "radius": isochrone_geoms = _build_radius_isochrones( dist_matrix, weight_value, weight_type, speed, nodes, buffer_params["buffer_factor"] ) else: # isochrone_type == 'ways': if graph_type in ["intermodal", "walk"]: isochrone_edges = edges[edges["type"] == "walk"] else: isochrone_edges = edges.copy() all_isochrones_edges = isochrone_edges.buffer(buffer_params["road_buffer_size"], resolution=1).union_all() all_isochrones_edges = gpd.GeoDataFrame(geometry=[all_isochrones_edges], crs=local_crs) isochrone_geoms = _build_ways_isochrones( dist_matrix=dist_matrix, weight_value=weight_value, weight_type=weight_type, speed=speed, nodes=nodes, all_isochrones_edges=all_isochrones_edges, buffer_factor=buffer_params["buffer_factor"], ) nodes = nodes.clip(isochrone_geoms[0], keep_geom_type=True) nodes["dist"] = np.minimum(np.ceil(nodes["dist"] / step) * step, weight_value) voronois = gpd.GeoDataFrame(geometry=nodes.voronoi_polygons(), crs=local_crs) stepped_iso = ( voronois.sjoin(nodes[["dist", "geometry"]]).dissolve(by="dist", as_index=False).drop(columns="index_right") ) stepped_iso = stepped_iso.clip(isochrone_geoms[0], keep_geom_type=True) pt_nodes, pt_edges = _process_pt_data(nodes, edges, graph_type) if pt_nodes is not None: pt_nodes.to_crs(original_crs, inplace=True) if pt_edges is not None: pt_edges.to_crs(original_crs, inplace=True) return stepped_iso.to_crs(original_crs), pt_nodes, pt_edges
[docs] def get_accessibility_isochrones( isochrone_type: Literal["radius", "ways"], points: gpd.GeoDataFrame, weight_value: float, weight_type: Literal["time_min", "length_meter"], nx_graph: nx.Graph, **kwargs: Any, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]: """ Calculate accessibility isochrones from input points based on the provided city graph. Supports two types of isochrones: - 'radius': Circular buffer-based isochrones - 'ways': Road network-based isochrones Args: isochrone_type: Type of isochrone to calculate: - "radius": Creates circular buffers around reachable nodes - "ways": Creates polygons based on reachable road network points (gpd.GeoDataFrame): GeoDataFrame containing source points for isochrone calculation. weight_value (float): Maximum travel time (minutes) or distance (meters) threshold. weight_type: Type of weight calculation: - "time_min": Time-based accessibility in minutes - "length_meter": Distance-based accessibility in meters nx_graph (nx.Graph): NetworkX graph representing the transportation network. Must contain CRS and speed attributes for time calculations. **kwargs: Additional parameters: - buffer_factor: Size multiplier for buffers (default: 0.7) - road_buffer_size: Buffer size for road edges in meters (default: 5) Returns: tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]: Tuple containing: - isochrones: GeoDataFrame with calculated isochrone polygons - pt_stops: Public transport stops within isochrones (if available) - pt_routes: Public transport routes within isochrones (if available) """ buffer_params = { "buffer_factor": 0.7, "road_buffer_size": 5, } original_crs = points.crs buffer_params.update(kwargs) points = points.copy() local_crs, graph_type = _validate_inputs(points, weight_value, weight_type, nx_graph) nx_graph, points, dist_nearest, speed = _prepare_graph_and_nodes( points, nx_graph, graph_type, weight_type, weight_value ) weight_cutoff = ( weight_value + (100 if weight_type == "length_meter" else 1) if isochrone_type == "ways" else weight_value ) dist_matrix, subgraph = _calculate_distance_matrix( nx_graph, points["nearest_node"].values, weight_type, weight_cutoff, dist_nearest ) logger.info("Building isochrones geometry...") nodes, edges = graph_to_gdf(subgraph) if isochrone_type == "radius": isochrone_geoms = _build_radius_isochrones( dist_matrix, weight_value, weight_type, speed, nodes, buffer_params["buffer_factor"] ) else: # isochrone_type == 'ways': if graph_type in ["intermodal", "walk"]: isochrone_edges = edges[edges["type"] == "walk"] else: isochrone_edges = edges.copy() all_isochrones_edges = isochrone_edges.buffer(buffer_params["road_buffer_size"], resolution=1).union_all() all_isochrones_edges = gpd.GeoDataFrame(geometry=[all_isochrones_edges], crs=local_crs) isochrone_geoms = _build_ways_isochrones( dist_matrix=dist_matrix, weight_value=weight_value, weight_type=weight_type, speed=speed, nodes=nodes, all_isochrones_edges=all_isochrones_edges, buffer_factor=buffer_params["buffer_factor"], ) isochrones = _create_isochrones_gdf(points, isochrone_geoms, dist_matrix, local_crs, weight_type, weight_value) pt_nodes, pt_edges = _process_pt_data(nodes, edges, graph_type) if pt_nodes is not None: pt_nodes.to_crs(original_crs, inplace=True) if pt_edges is not None: pt_edges.to_crs(original_crs, inplace=True) return isochrones.to_crs(original_crs), pt_nodes, pt_edges
def _build_radius_isochrones(dist_matrix, weight_value, weight_type, speed, nodes, buffer_factor): results = [] for source in dist_matrix.index: buffers = (weight_value - dist_matrix.loc[source]) * buffer_factor if weight_type == "time_min": buffers = buffers * speed buffers = nodes.merge(buffers, left_index=True, right_index=True) buffers.geometry = buffers.geometry.buffer(buffers[source], resolution=8) results.append(buffers.union_all()) return results def _build_ways_isochrones(dist_matrix, weight_value, weight_type, speed, nodes, all_isochrones_edges, buffer_factor): results = [] for source in dist_matrix.index: reachable_nodes = dist_matrix.loc[source] reachable_nodes = reachable_nodes[reachable_nodes <= weight_value] reachable_nodes = (weight_value - reachable_nodes) * buffer_factor if weight_type == "time_min": reachable_nodes = reachable_nodes * speed reachable_nodes = nodes.merge(reachable_nodes, left_index=True, right_index=True) clip_zone = reachable_nodes.buffer(reachable_nodes[source], resolution=4).union_all() isochrone_edges = all_isochrones_edges.clip(clip_zone, keep_geom_type=True).explode(ignore_index=True) geom_to_keep = isochrone_edges.sjoin(reachable_nodes, how="inner").index.unique() isochrone = remove_inner_geom(isochrone_edges.loc[geom_to_keep].union_all()) results.append(isochrone) return results