Skip to content

API Reference

objectnat

ObjectNat is an open-source library created for geospatial analysis created by IDU team.

Homepage https://github.com/DDonnyy/ObjectNat.

calculate_simplified_noise_frame(noise_sources: gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, air_temperature, **kwargs) -> gpd.GeoDataFrame

Calculates a simplified environmental noise frame using static noise source geometries without simulating full sound wave propagation or reflections.

This function provides a fast approximation of noise dispersion from a variety of source geometries, including points (e.g., traffic noise measurement points), lines (e.g., roads or railways), and polygons (e.g., industrial zones or buildings). Instead of simulating detailed wave interactions and reflections, it constructs an envelope of potential noise exposure by buffering the source geometry and applying simplified decay formulas based on sound power, frequency and temperature.

Parameters:

Name Type Description Default
noise_sources GeoDataFrame

A GeoDataFrame containing geometries of noise sources (Point, LineString, or Polygon). Each feature must have the following two columns: - 'source_noise_db': Initial sound level at the source, in decibels (dB). - 'geometric_mean_freq_hz': Characteristic sound frequency (Hz) used to model distance-based attenuation. Values in 'source_noise_db' must not exceed the physical maximum of 194 dB. Missing or NaN values in required fields will raise an error.

required
obstacles GeoDataFrame

A GeoDataFrame representing physical obstructions in the environment (e.g., buildings, walls, terrain). These are used to build visibility masks that affect where sound can propagate. Geometry will be simplified for performance using a default tolerance of 1 unit.

required
air_temperature float

The ambient air temperature in degrees Celsius. This value influences the attenuation model of sound in the atmosphere. Temperatures significantly outside the typical 0–30°C range may lead to inaccurate results.

required
Optional kwargs
  • target_noise_db (float, optional): The minimum sound level threshold (in dB) to be modeled. Any value below this threshold is considered insignificant and will be excluded from the resulting noise frame. Default is 40 dB.
  • db_sim_step (float, optional): The simulation step size (in dB) used to discretize sound levels into spatial layers. Default is 5. Smaller values produce more detailed output but increase computation time.
  • linestring_point_radius (float, optional): The spacing radius (in meters) used when converting LineString geometries into distributed point sources for simulation. Default is 30. Reducing this value improves detail along long lines.
  • polygon_point_radius (float, optional): The point spacing (in meters) for distributing sources within Polygon geometries. Default is 15. Points are sampled across the polygon’s surface and perimeter to represent the full sound-emitting area.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame representing simplified noise distribution areas. The output geometries are polygons where each polygon is associated with the maximum sound level (in dB) present in that area, as derived from overlapping source zones. The resulting data is dissolved by noise level and returned in the original coordinate reference system (CRS) of the input sources.

Notes
  • The function does not model reflections or complex diffraction effects. It uses straight-line visibility (line-of-sight) and a layered distance-decay approach for rapid estimation.
  • Obstacles are used for visibility masking only, not as reflectors or absorbers.
  • Output resolution and accuracy depend heavily on the geometry type and point distribution settings.
  • Results are useful for quick noise mapping or for generating initial noise envelopes prior to more detailed simulations.
Source code in src\objectnat\methods\noise\noise_simulation_simplified.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def calculate_simplified_noise_frame(
    noise_sources: gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, air_temperature, **kwargs
) -> gpd.GeoDataFrame:
    """
    Calculates a simplified environmental noise frame using static noise source geometries without simulating
    full sound wave propagation or reflections.

    This function provides a fast approximation of noise dispersion from a variety of source geometries, including
    points (e.g., traffic noise measurement points), lines (e.g., roads or railways), and polygons (e.g., industrial
    zones or buildings). Instead of simulating detailed wave interactions and reflections, it constructs an
    envelope of potential noise exposure by buffering the source geometry and applying simplified decay formulas
    based on sound power, frequency and temperature.

    Args:
        noise_sources (gpd.GeoDataFrame): A GeoDataFrame containing geometries of noise sources (Point, LineString,
            or Polygon). Each feature must have the following two columns:
                - 'source_noise_db': Initial sound level at the source, in decibels (dB).
                - 'geometric_mean_freq_hz': Characteristic sound frequency (Hz) used to model distance-based
                  attenuation.
            Values in 'source_noise_db' must not exceed the physical maximum of 194 dB. Missing or NaN values in
            required fields will raise an error.

        obstacles (gpd.GeoDataFrame): A GeoDataFrame representing physical obstructions in the environment
            (e.g., buildings, walls, terrain). These are used to build visibility masks that affect where sound can
            propagate. Geometry will be simplified for performance using a default tolerance of 1 unit.

        air_temperature (float): The ambient air temperature in degrees Celsius. This value influences the
            attenuation model of sound in the atmosphere. Temperatures significantly outside the typical 0–30°C
            range may lead to inaccurate results.

    Optional kwargs:
        - target_noise_db (float, optional): The minimum sound level threshold (in dB) to be modeled. Any value below
            this threshold is considered insignificant and will be excluded from the resulting noise frame.
            Default is 40 dB.
        - db_sim_step (float, optional): The simulation step size (in dB) used to discretize sound levels into
            spatial layers. Default is 5. Smaller values produce more detailed output but increase computation time.
        - linestring_point_radius (float, optional): The spacing radius (in meters) used when converting LineString
            geometries into distributed point sources for simulation. Default is 30. Reducing this value improves
            detail along long lines.
        - polygon_point_radius (float, optional): The point spacing (in meters) for distributing sources within
            Polygon geometries. Default is 15. Points are sampled across the polygon’s surface and perimeter to
            represent the full sound-emitting area.

    Returns:
        (gpd.GeoDataFrame): A GeoDataFrame representing simplified noise distribution areas. The output geometries
            are polygons where each polygon is associated with the maximum sound level (in dB) present in that area,
            as derived from overlapping source zones. The resulting data is dissolved by noise level and returned in
            the original coordinate reference system (CRS) of the input sources.

    Notes:
        - The function does not model reflections or complex diffraction effects. It uses straight-line
          visibility (line-of-sight) and a layered distance-decay approach for rapid estimation.
        - Obstacles are used for visibility masking only, not as reflectors or absorbers.
        - Output resolution and accuracy depend heavily on the geometry type and point distribution settings.
        - Results are useful for quick noise mapping or for generating initial noise envelopes prior to more
          detailed simulations.
    """
    target_noise_db = kwargs.get("target_noise_db", 40)
    db_sim_step = kwargs.get("db_sim_step", 5)
    linestring_point_radius = kwargs.get("linestring_point_radius", 30)
    polygon_point_radius = kwargs.get("polygon_point_radius", 15)

    required_columns = ["source_noise_db", "geometric_mean_freq_hz"]
    for col in required_columns:
        if col not in noise_sources.columns:
            raise ValueError(f"'{col}' column is missing in provided GeoDataFrame")
        if noise_sources[col].isnull().any():
            raise ValueError(f"Column '{col}' contains missing (NaN) values")
    if (noise_sources["source_noise_db"] > MAX_DB_VALUE).any():
        raise ValueError(
            f"One or more values in 'source_noise_db' column exceed the physical limit of {MAX_DB_VALUE} dB."
        )
    original_crs = noise_sources.crs
    if len(obstacles) > 0:
        obstacles = obstacles.copy()
        obstacles.geometry = obstacles.geometry.simplify(tolerance=1)
        local_crs = obstacles.estimate_utm_crs()
        obstacles.to_crs(local_crs, inplace=True)
        noise_sources.to_crs(local_crs, inplace=True)
    else:
        local_crs = noise_sources.estimate_utm_crs()
        noise_sources.to_crs(local_crs, inplace=True)
        noise_sources.reset_index(drop=True)

    noise_sources = noise_sources.explode(ignore_index=True)
    noise_sources["geom_type"] = noise_sources.geom_type

    grouped_sources = noise_sources.groupby(by=["source_noise_db", "geometric_mean_freq_hz", "geom_type"])

    frame_result = []
    total_tasks = 0
    with tqdm(total=total_tasks, desc="Simulating noise") as pbar:
        for (source_db, freq_hz, geom_type), group_gdf in grouped_sources:
            # calculating layer dist and db values
            dist_db = [(0, source_db)]
            cur_db = source_db - db_sim_step
            max_dist = 0
            while cur_db > target_noise_db - db_sim_step:
                if cur_db - db_sim_step < target_noise_db:
                    cur_db = target_noise_db
                max_dist = dist_to_target_db(source_db, cur_db, freq_hz, air_temperature)
                dist_db.append((max_dist, cur_db))
                cur_db -= db_sim_step

            # increasing max_dist for extra view
            max_dist = max_dist * 1.2

            if geom_type == "Point":
                total_tasks += len(group_gdf)
                pbar.total = total_tasks
                pbar.refresh()
                for _, row in group_gdf.iterrows():
                    point_from = row.geometry
                    point_buffer = point_from.buffer(max_dist, resolution=16)
                    local_obstacles = obstacles[obstacles.intersects(point_buffer)]
                    vis_poly = get_visibility_accurate(point_from, obstacles=local_obstacles, view_distance=max_dist)
                    noise_from_feature = _eval_donuts_gdf(point_from, dist_db, local_crs, vis_poly)
                    frame_result.append(noise_from_feature)
                    pbar.update(1)

            elif geom_type == "LineString":
                layer_points = distribute_points_on_linestrings(
                    group_gdf, radius=linestring_point_radius, lloyd_relax_n=1
                )
                total_tasks += len(layer_points)
                pbar.total = total_tasks
                pbar.refresh()
                noise_from_feature = _process_lines_or_polygons(
                    group_gdf, max_dist, obstacles, layer_points, dist_db, local_crs, pbar
                )
                frame_result.append(noise_from_feature)
            elif geom_type == "Polygon":
                group_gdf.geometry = group_gdf.buffer(0.1, resolution=1)
                layer_points = distribute_points_on_polygons(
                    group_gdf, only_exterior=False, radius=polygon_point_radius, lloyd_relax_n=1
                )
                total_tasks += len(layer_points)
                pbar.total = total_tasks
                pbar.refresh()
                noise_from_feature = _process_lines_or_polygons(
                    group_gdf, max_dist, obstacles, layer_points, dist_db, local_crs, pbar
                )
                frame_result.append(noise_from_feature)
            else:
                pass

    noise_gdf = gpd.GeoDataFrame(pd.concat(frame_result, ignore_index=True), crs=local_crs)
    polygons = gpd.GeoDataFrame(
        geometry=list(polygonize(noise_gdf.geometry.apply(polygons_to_multilinestring).union_all())), crs=local_crs
    )
    polygons_points = polygons.copy()
    polygons_points.geometry = polygons.representative_point()
    sim_result = polygons_points.sjoin(noise_gdf, predicate="within").reset_index()
    sim_result = sim_result.groupby("index").agg({"noise_level": "max"})
    sim_result["geometry"] = polygons
    sim_result = (
        gpd.GeoDataFrame(sim_result, geometry="geometry", crs=local_crs).dissolve(by="noise_level").reset_index()
    )

    return sim_result.to_crs(original_crs)

calculate_visibility_catchment_area(points: gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance: int | float, max_workers: int = cpu_count()) -> gpd.GeoDataFrame

Calculate visibility catchment areas for a large urban area based on given points and obstacles. This function is designed to work with at least 1000 points spaced 10-20 meters apart for optimal results. Points can be generated using a road graph.

Parameters:

Name Type Description Default
points GeoDataFrame

GeoDataFrame containing the points from which visibility is calculated.

required
obstacles GeoDataFrame

GeoDataFrame containing the obstacles that block visibility.

required
view_distance int | float

The maximum distance from each point within which visibility is calculated.

required
max_workers int

Maximum workers in multiproccesing, multipocessing.cpu_count() by default.

cpu_count()

Returns:

Type Description
GeoDataFrame

GeoDataFrame containing the calculated visibility catchment areas.

Source code in src\objectnat\methods\visibility\visibility_analysis.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def calculate_visibility_catchment_area(
    points: gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance: int | float, max_workers: int = cpu_count()
) -> gpd.GeoDataFrame:  # pragma: no cover
    """
    Calculate visibility catchment areas for a large urban area based on given points and obstacles.
    This function is designed to work with at least 1000 points spaced 10-20 meters apart for optimal results.
    Points can be generated using a road graph.

    Parameters:
        points (gpd.GeoDataFrame): GeoDataFrame containing the points from which visibility is calculated.
        obstacles (gpd.GeoDataFrame): GeoDataFrame containing the obstacles that block visibility.
        view_distance (int | float): The maximum distance from each point within which visibility is calculated.
        max_workers (int): Maximum workers in multiproccesing, multipocessing.cpu_count() by default.

    Returns:
        (gpd.GeoDataFrame): GeoDataFrame containing the calculated visibility catchment areas.
    """

    def filter_geoms(x):
        if x.geom_type == "GeometryCollection":
            return MultiPolygon([y for y in x.geoms if y.geom_type in ["Polygon", "MultiPolygon"]])
        return x

    def calc_group_factor(x):
        # pylint: disable-next=redefined-outer-name,reimported,import-outside-toplevel
        import numpy as np

        return np.mean(x.new_ratio) * x.count_n

    def unary_union_groups(x):
        # pylint: disable-next=redefined-outer-name,reimported,import-outside-toplevel
        from shapely import MultiPolygon

        # pylint: disable-next=redefined-outer-name,reimported,import-outside-toplevel
        from shapely.ops import unary_union

        return unary_union(MultiPolygon(list(x["geometry"])).buffer(0))

    pandarallel.initialize(progress_bar=True, verbose=0)

    local_crs = obstacles.estimate_utm_crs()
    obstacles = obstacles.to_crs(local_crs)
    points = points.to_crs(local_crs)

    sectors_n = 12
    logger.info("Calculating Visibility Catchment Area from each point")
    all_visions_sectorized = get_visibilities_from_points(points, obstacles, view_distance, sectors_n, max_workers)
    all_visions_sectorized = gpd.GeoDataFrame(
        geometry=[item for sublist in all_visions_sectorized for item in sublist], crs=local_crs
    )
    logger.info("Calculating non-vision part...")
    all_visions_unary = all_visions_sectorized.union_all()
    convex = all_visions_unary.convex_hull
    dif = convex.difference(all_visions_unary)

    del convex, all_visions_unary

    buf_area = (math.pi * view_distance**2) / sectors_n
    all_visions_sectorized["ratio"] = all_visions_sectorized.area / buf_area
    all_visions_sectorized["ratio"] = min_max_normalization(
        all_visions_sectorized["ratio"].values, new_min=1, new_max=10
    )
    groups = all_visions_sectorized.sample(frac=1).groupby(all_visions_sectorized.index // 6000)
    groups = [group for _, group in groups]

    del all_visions_sectorized

    groups_result = process_map(
        _process_group,
        groups,
        desc="Counting intersections in each group...",
        max_workers=max_workers,
    )
    logger.info("Calculating all groups intersection...")
    all_in = combine_geometry(gpd.GeoDataFrame(data=pd.concat(groups_result), geometry="geometry", crs=local_crs))

    del groups_result

    all_in["count_n"] = all_in["index_right"].apply(len)

    logger.info("Calculating intersection's parameters")
    all_in["factor"] = all_in.parallel_apply(calc_group_factor, axis=1)
    threshold = all_in["factor"].quantile(0.3)
    all_in = all_in[all_in["factor"] > threshold]

    all_in["factor_normalized"] = np.round(
        min_max_normalization(np.sqrt(all_in["factor"].values), new_min=1, new_max=5)
    ).astype(int)
    logger.info("Calculating normalized groups geometry...")
    all_in = all_in.groupby("factor_normalized").parallel_apply(unary_union_groups).reset_index()
    all_in = gpd.GeoDataFrame(data=all_in.rename(columns={0: "geometry"}), geometry="geometry", crs=32636)

    all_in = all_in.explode(index_parts=True).reset_index(drop=True)
    all_in["area"] = all_in.area
    threshold = all_in["area"].quantile(0.9)
    all_in = all_in[all_in["area"] > threshold]
    all_in = all_in.groupby("factor_normalized").apply(unary_union_groups).reset_index()
    all_in = gpd.GeoDataFrame(data=all_in.rename(columns={0: "geometry"}), geometry="geometry", crs=32636)

    all_in.geometry = all_in.geometry.buffer(20).buffer(-20).difference(dif)

    all_in.sort_values(by="factor_normalized", ascending=False, inplace=True)
    all_in.reset_index(drop=True, inplace=True)
    logger.info("Smoothing normalized groups geometry...")
    for ind, row in all_in.iloc[:-1].iterrows():
        for ind2 in range(ind + 1, len(all_in)):
            current_geometry = all_in.at[ind2, "geometry"]
            all_in.at[ind2, "geometry"] = current_geometry.difference(row.geometry)
            all_in["geometry"] = all_in["geometry"].apply(filter_geoms)

    all_in = all_in.explode(index_parts=True)
    logger.info("Done!")
    return all_in

gdf_to_graph(gdf: gpd.GeoDataFrame, project_gdf_attr=True, reproject_to_utm_crs=True, speed=5, check_intersections=True) -> nx.DiGraph

Converts a GeoDataFrame of LineStrings into a directed graph (nx.DiGraph).

This function transforms a set of linear geometries (which may or may not form a planar graph) into a directed graph where each edge corresponds to a LineString (or its segment) from the GeoDataFrame. Intersections are optionally checked and merged. Attributes from the original GeoDataFrame can be projected onto the graph edges using spatial matching.

Parameters:

Name Type Description Default
gdf GeoDataFrame

A GeoDataFrame containing at least one LineString geometry.

required
project_gdf_attr bool

If True, attributes from the input GeoDataFrame will be spatially projected onto the resulting graph edges. This can be an expensive operation for large datasets.

True
reproject_to_utm_crs bool

If True, reprojects the GeoDataFrame to the estimated local UTM CRS to ensure accurate edge length calculations in meters. If False, edge lengths are still computed in UTM CRS, but the final graph will remain in the original CRS of the input GeoDataFrame.

True
speed float

Assumed travel speed in km/h used to compute edge traversal time (in minutes).

5
check_intersections bool

If True, merges geometries to ensure topological correctness. Can be disabled if the input geometries already form a proper planar graph with no unintended intersections.

True

Returns:

Type Description
DiGraph

A directed graph where each edge corresponds to a line segment from the input GeoDataFrame. Edge attributes include geometry, length in meters, travel time (in minutes), and any additional projected attributes from the original GeoDataFrame.

Raises:

Type Description
ValueError

If the input GeoDataFrame contains no valid LineStrings.

Source code in src\objectnat\methods\utils\graph_utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def gdf_to_graph(
    gdf: gpd.GeoDataFrame, project_gdf_attr=True, reproject_to_utm_crs=True, speed=5, check_intersections=True
) -> nx.DiGraph:
    """
    Converts a GeoDataFrame of LineStrings into a directed graph (nx.DiGraph).

    This function transforms a set of linear geometries (which may or may not form a planar graph)
    into a directed graph where each edge corresponds to a LineString (or its segment) from the GeoDataFrame.
    Intersections are optionally checked and merged. Attributes from the original GeoDataFrame
    can be projected onto the graph edges using spatial matching.

    Parameters:
        gdf (gpd.GeoDataFrame): A GeoDataFrame containing at least one LineString geometry.
        project_gdf_attr (bool): If True, attributes from the input GeoDataFrame will be spatially
            projected onto the resulting graph edges. This can be an expensive operation for large datasets.
        reproject_to_utm_crs (bool): If True, reprojects the GeoDataFrame to the estimated local UTM CRS to ensure
            accurate edge length calculations in meters. If False, edge lengths are still computed in UTM CRS,
            but the final graph will remain in the original CRS of the input GeoDataFrame.
        speed (float): Assumed travel speed in km/h used to compute edge traversal time (in minutes).
        check_intersections (bool): If True, merges geometries to ensure topological correctness.
            Can be disabled if the input geometries already form a proper planar graph with no unintended intersections.

    Returns:
        (nx.DiGraph): A directed graph where each edge corresponds to a line segment from the input GeoDataFrame.
            Edge attributes include geometry, length in meters, travel time (in minutes), and any additional projected
            attributes from the original GeoDataFrame.

    Raises:
        ValueError: If the input GeoDataFrame contains no valid LineStrings.
    """

    def unique_list(agg_vals):
        agg_vals = list(set(agg_vals.dropna()))
        if len(agg_vals) == 1:
            return agg_vals[0]
        return agg_vals

    original_crs = gdf.crs
    gdf = gdf.to_crs(gdf.estimate_utm_crs())

    gdf = gdf.explode(ignore_index=True)
    gdf = gdf[gdf.geom_type == "LineString"]

    if len(gdf) == 0:
        raise ValueError("Provided GeoDataFrame contains no valid LineStrings")

    if check_intersections:
        lines = line_merge(node(MultiLineString(gdf.geometry.to_list())))
    else:
        lines = line_merge(MultiLineString(gdf.geometry.to_list()))

    lines = gpd.GeoDataFrame(geometry=list(lines.geoms), crs=gdf.crs)

    if len(gdf.columns) > 1 and project_gdf_attr:
        lines_centroids = lines.copy()
        lines_centroids.geometry = lines_centroids.apply(
            lambda row: row.geometry.line_interpolate_point(row.geometry.length / 2), axis=1
        ).buffer(0.05, resolution=2)
        lines_with_attrs = gpd.sjoin(lines_centroids, gdf, how="left", predicate="intersects")
        aggregated_attrs = (
            lines_with_attrs.drop(columns=["geometry", "index_right"])  # убрать геометрию буфера
            .groupby(lines_with_attrs.index)
            .agg(unique_list)
        )
        lines = pd.concat([lines, aggregated_attrs], axis=1)

    lines["length_meter"] = np.round(lines.length, 2)
    if not reproject_to_utm_crs:
        lines = lines.to_crs(original_crs)

    coords = lines.geometry.get_coordinates()
    coords_grouped_by_index = coords.reset_index(names="old_index").groupby("old_index")
    start_coords = coords_grouped_by_index.head(1).apply(lambda a: (a.x, a.y), axis=1).rename("start")
    end_coords = coords_grouped_by_index.tail(1).apply(lambda a: (a.x, a.y), axis=1).rename("end")
    coords = pd.concat([start_coords.reset_index(), end_coords.reset_index()], axis=1)[["start", "end"]]
    lines = pd.concat([lines, coords], axis=1)
    unique_coords = pd.concat([coords["start"], coords["end"]], ignore_index=True).unique()
    coord_to_index = {coord: idx for idx, coord in enumerate(unique_coords)}

    lines["u"] = lines["start"].map(coord_to_index)
    lines["v"] = lines["end"].map(coord_to_index)

    speed = speed * 1000 / 60
    lines["time_min"] = np.round(lines["length_meter"] / speed, 2)

    graph = nx.Graph()
    for coords, node_id in coord_to_index.items():
        x, y = coords
        graph.add_node(node_id, x=float(x), y=float(y))

    columns_to_attr = lines.columns.difference(["start", "end", "u", "v"])
    for _, row in lines.iterrows():
        edge_attrs = {}
        for col in columns_to_attr:
            edge_attrs[col] = row[col]
        graph.add_edge(row.u, row.v, **edge_attrs)

    graph.graph["crs"] = lines.crs
    graph.graph["speed m/min"] = speed
    return nx.DiGraph(graph)

get_accessibility_isochrone_stepped(isochrone_type: Literal['radius', 'ways', 'separate'], point: gpd.GeoDataFrame, weight_value: float, weight_type: Literal['time_min', 'length_meter'], nx_graph: nx.Graph, step: float = None, **kwargs: Any) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]

Calculate stepped accessibility isochrones for a single point with specified intervals.

Parameters:

Name Type Description Default
isochrone_type Literal['radius', 'ways', 'separate']

Visualization method for stepped isochrones: - "radius": Voronoi-based in circular buffers - "ways": Voronoi-based in road network polygons - "separate": Circular buffers for each step

required
point GeoDataFrame

Single source point for isochrone calculation (uses first geometry if multiple provided).

required
weight_value float

Maximum travel time (minutes) or distance (meters) threshold.

required
weight_type Literal['time_min', 'length_meter']

Type of weight calculation: - "time_min": Time-based in minutes - "length_meter": Distance-based in meters

required
nx_graph Graph

NetworkX graph representing the transportation network.

required
step float

Interval between isochrone steps. Defaults to: - 100 meters for distance-based - 1 minute for time-based

None
**kwargs Any

Additional parameters: - buffer_factor: Size multiplier for buffers (default: 0.7) - road_buffer_size: Buffer size for road edges in meters (default: 5)

{}

Returns:

Type Description
tuple[GeoDataFrame, GeoDataFrame | None, GeoDataFrame | None]

Tuple containing: - stepped_isochrones: GeoDataFrame with stepped polygons and distance/time attributes - pt_stops: Public transport stops within isochrones (if available) - pt_routes: Public transport routes within isochrones (if available)

Source code in src\objectnat\methods\isochrones\isochrones.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def get_accessibility_isochrone_stepped(
    isochrone_type: Literal["radius", "ways", "separate"],
    point: gpd.GeoDataFrame,
    weight_value: float,
    weight_type: Literal["time_min", "length_meter"],
    nx_graph: nx.Graph,
    step: float = None,
    **kwargs: Any,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]:
    """
    Calculate stepped accessibility isochrones for a single point with specified intervals.

    Parameters:
        isochrone_type (Literal["radius", "ways", "separate"]):
            Visualization method for stepped isochrones:
            - "radius": Voronoi-based in circular buffers
            - "ways": Voronoi-based in road network polygons
            - "separate": Circular buffers for each step
        point (gpd.GeoDataFrame):
            Single source point for isochrone calculation (uses first geometry if multiple provided).
        weight_value (float):
            Maximum travel time (minutes) or distance (meters) threshold.
        weight_type (Literal["time_min", "length_meter"]):
            Type of weight calculation:
            - "time_min": Time-based in minutes
            - "length_meter": Distance-based in meters
        nx_graph (nx.Graph):
            NetworkX graph representing the transportation network.
        step (float, optional):
            Interval between isochrone steps. Defaults to:
            - 100 meters for distance-based
            - 1 minute for time-based
        **kwargs: Additional parameters:
            - buffer_factor: Size multiplier for buffers (default: 0.7)
            - road_buffer_size: Buffer size for road edges in meters (default: 5)

    Returns:
        (tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]):
            Tuple containing:
            - stepped_isochrones: GeoDataFrame with stepped polygons and distance/time attributes
            - pt_stops: Public transport stops within isochrones (if available)
            - pt_routes: Public transport routes within isochrones (if available)
    """
    buffer_params = {
        "buffer_factor": 0.7,
        "road_buffer_size": 5,
    }

    buffer_params.update(kwargs)
    original_crs = point.crs
    point = point.copy()
    if len(point) > 1:
        logger.warning(
            f"This method processes only single point. The GeoDataFrame contains {len(point)} points - "
            "only the first geometry will be used for isochrone calculation. "
        )
        point = point.iloc[[0]]

    local_crs, graph_type = _validate_inputs(point, weight_value, weight_type, nx_graph)

    if step is None:
        if weight_type == "length_meter":
            step = 100
        else:
            step = 1
    nx_graph, points, dist_nearest, speed = _prepare_graph_and_nodes(
        point, nx_graph, graph_type, weight_type, weight_value
    )

    dist_matrix, subgraph = _calculate_distance_matrix(
        nx_graph, points["nearest_node"].values, weight_type, weight_value, dist_nearest
    )

    logger.info("Building isochrones geometry...")
    nodes, edges = graph_to_gdf(subgraph)
    nodes.loc[dist_matrix.columns, "dist"] = dist_matrix.iloc[0]

    if isochrone_type == "separate":
        stepped_iso = create_separated_dist_polygons(nodes, weight_value, weight_type, step, speed)
    else:
        if isochrone_type == "radius":
            isochrone_geoms = _build_radius_isochrones(
                dist_matrix, weight_value, weight_type, speed, nodes, buffer_params["buffer_factor"]
            )
        else:  # isochrone_type == 'ways':
            if graph_type in ["intermodal", "walk"]:
                isochrone_edges = edges[edges["type"] == "walk"]
            else:
                isochrone_edges = edges.copy()
            all_isochrones_edges = isochrone_edges.buffer(buffer_params["road_buffer_size"], resolution=1).union_all()
            all_isochrones_edges = gpd.GeoDataFrame(geometry=[all_isochrones_edges], crs=local_crs)
            isochrone_geoms = _build_ways_isochrones(
                dist_matrix=dist_matrix,
                weight_value=weight_value,
                weight_type=weight_type,
                speed=speed,
                nodes=nodes,
                all_isochrones_edges=all_isochrones_edges,
                buffer_factor=buffer_params["buffer_factor"],
            )
        nodes = nodes.clip(isochrone_geoms[0], keep_geom_type=True)
        nodes["dist"] = np.minimum(np.ceil(nodes["dist"] / step) * step, weight_value)
        voronois = gpd.GeoDataFrame(geometry=nodes.voronoi_polygons(), crs=local_crs)
        stepped_iso = (
            voronois.sjoin(nodes[["dist", "geometry"]]).dissolve(by="dist", as_index=False).drop(columns="index_right")
        )
        stepped_iso = stepped_iso.clip(isochrone_geoms[0], keep_geom_type=True)

    pt_nodes, pt_edges = _process_pt_data(nodes, edges, graph_type)
    if pt_nodes is not None:
        pt_nodes.to_crs(original_crs, inplace=True)
    if pt_edges is not None:
        pt_edges.to_crs(original_crs, inplace=True)
    return stepped_iso.to_crs(original_crs), pt_nodes, pt_edges

get_accessibility_isochrones(isochrone_type: Literal['radius', 'ways'], points: gpd.GeoDataFrame, weight_value: float, weight_type: Literal['time_min', 'length_meter'], nx_graph: nx.Graph, **kwargs: Any) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]

Calculate accessibility isochrones from input points based on the provided city graph.

Supports two types of isochrones
  • 'radius': Circular buffer-based isochrones
  • 'ways': Road network-based isochrones

Parameters:

Name Type Description Default
isochrone_type Literal['radius', 'ways']

Type of isochrone to calculate: - "radius": Creates circular buffers around reachable nodes - "ways": Creates polygons based on reachable road network

required
points GeoDataFrame

GeoDataFrame containing source points for isochrone calculation.

required
weight_value float

Maximum travel time (minutes) or distance (meters) threshold.

required
weight_type Literal['time_min', 'length_meter']

Type of weight calculation: - "time_min": Time-based accessibility in minutes - "length_meter": Distance-based accessibility in meters

required
nx_graph Graph

NetworkX graph representing the transportation network. Must contain CRS and speed attributes for time calculations.

required
**kwargs Any

Additional parameters: - buffer_factor: Size multiplier for buffers (default: 0.7) - road_buffer_size: Buffer size for road edges in meters (default: 5)

{}

Returns:

Type Description
tuple[GeoDataFrame, GeoDataFrame | None, GeoDataFrame | None]

Tuple containing: - isochrones: GeoDataFrame with calculated isochrone polygons - pt_stops: Public transport stops within isochrones (if available) - pt_routes: Public transport routes within isochrones (if available)

Source code in src\objectnat\methods\isochrones\isochrones.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def get_accessibility_isochrones(
    isochrone_type: Literal["radius", "ways"],
    points: gpd.GeoDataFrame,
    weight_value: float,
    weight_type: Literal["time_min", "length_meter"],
    nx_graph: nx.Graph,
    **kwargs: Any,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]:
    """
    Calculate accessibility isochrones from input points based on the provided city graph.

    Supports two types of isochrones:
        - 'radius': Circular buffer-based isochrones
        - 'ways': Road network-based isochrones

    Parameters:
        isochrone_type (Literal["radius", "ways"]):
            Type of isochrone to calculate:
            - "radius": Creates circular buffers around reachable nodes
            - "ways": Creates polygons based on reachable road network
        points (gpd.GeoDataFrame):
            GeoDataFrame containing source points for isochrone calculation.
        weight_value (float):
            Maximum travel time (minutes) or distance (meters) threshold.
        weight_type (Literal["time_min", "length_meter"]):
            Type of weight calculation:
            - "time_min": Time-based accessibility in minutes
            - "length_meter": Distance-based accessibility in meters
        nx_graph (nx.Graph):
            NetworkX graph representing the transportation network.
            Must contain CRS and speed attributes for time calculations.
        **kwargs: Additional parameters:
            - buffer_factor: Size multiplier for buffers (default: 0.7)
            - road_buffer_size: Buffer size for road edges in meters (default: 5)

    Returns:
        (tuple[gpd.GeoDataFrame, gpd.GeoDataFrame | None, gpd.GeoDataFrame | None]):
            Tuple containing:
            - isochrones: GeoDataFrame with calculated isochrone polygons
            - pt_stops: Public transport stops within isochrones (if available)
            - pt_routes: Public transport routes within isochrones (if available)

    """

    buffer_params = {
        "buffer_factor": 0.7,
        "road_buffer_size": 5,
    }
    original_crs = points.crs
    buffer_params.update(kwargs)

    points = points.copy()
    local_crs, graph_type = _validate_inputs(points, weight_value, weight_type, nx_graph)

    nx_graph, points, dist_nearest, speed = _prepare_graph_and_nodes(
        points, nx_graph, graph_type, weight_type, weight_value
    )

    weight_cutoff = (
        weight_value + (100 if weight_type == "length_meter" else 1) if isochrone_type == "ways" else weight_value
    )

    dist_matrix, subgraph = _calculate_distance_matrix(
        nx_graph, points["nearest_node"].values, weight_type, weight_cutoff, dist_nearest
    )

    logger.info("Building isochrones geometry...")
    nodes, edges = graph_to_gdf(subgraph)
    if isochrone_type == "radius":
        isochrone_geoms = _build_radius_isochrones(
            dist_matrix, weight_value, weight_type, speed, nodes, buffer_params["buffer_factor"]
        )
    else:  # isochrone_type == 'ways':
        if graph_type in ["intermodal", "walk"]:
            isochrone_edges = edges[edges["type"] == "walk"]
        else:
            isochrone_edges = edges.copy()
        all_isochrones_edges = isochrone_edges.buffer(buffer_params["road_buffer_size"], resolution=1).union_all()
        all_isochrones_edges = gpd.GeoDataFrame(geometry=[all_isochrones_edges], crs=local_crs)
        isochrone_geoms = _build_ways_isochrones(
            dist_matrix=dist_matrix,
            weight_value=weight_value,
            weight_type=weight_type,
            speed=speed,
            nodes=nodes,
            all_isochrones_edges=all_isochrones_edges,
            buffer_factor=buffer_params["buffer_factor"],
        )
    isochrones = _create_isochrones_gdf(points, isochrone_geoms, dist_matrix, local_crs, weight_type, weight_value)
    pt_nodes, pt_edges = _process_pt_data(nodes, edges, graph_type)
    if pt_nodes is not None:
        pt_nodes.to_crs(original_crs, inplace=True)
    if pt_edges is not None:
        pt_edges.to_crs(original_crs, inplace=True)
    return isochrones.to_crs(original_crs), pt_nodes, pt_edges

get_clusters_polygon(points: gpd.GeoDataFrame, min_dist: float | int = 100, min_point: int = 5, method: Literal['DBSCAN', 'HDBSCAN'] = 'HDBSCAN', service_code_column: str = 'service_code') -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]

Generate cluster polygons for given points based on a specified minimum distance and minimum points per cluster. Optionally, calculate the relative ratio between types of points within the clusters.

Parameters:

Name Type Description Default
points GeoDataFrame

GeoDataFrame containing the points to be clustered. Must include a 'service_code' column for service ratio calculations.

required
min_dist float | int

Minimum distance between points to be considered part of the same cluster. Defaults to 100.

100
min_point int

Minimum number of points required to form a cluster. Defaults to 5.

5
method Literal['DBSCAN', 'HDBSCAN']

The clustering method to use. Must be either "DBSCAN" or "HDBSCAN". Defaults to "HDBSCAN".

'HDBSCAN'
service_code_column str

Column, containing service type for relative ratio in clasterized polygons. Defaults to "service_code".

'service_code'

Returns:

Type Description
tuple[GeoDataFrame, GeoDataFrame]

A tuple containing the clustered polygons GeoDataFrame and the original points GeoDataFrame with cluster labels.

Source code in src\objectnat\methods\point_clustering\cluster_points_in_polygons.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_clusters_polygon(
    points: gpd.GeoDataFrame,
    min_dist: float | int = 100,
    min_point: int = 5,
    method: Literal["DBSCAN", "HDBSCAN"] = "HDBSCAN",
    service_code_column: str = "service_code",
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
    """
    Generate cluster polygons for given points based on a specified minimum distance and minimum points per cluster.
    Optionally, calculate the relative ratio between types of points within the clusters.

    Parameters:
        points (gpd.GeoDataFrame):
            GeoDataFrame containing the points to be clustered.
            Must include a 'service_code' column for service ratio calculations.
        min_dist (float | int, optional):
            Minimum distance between points to be considered part of the same cluster. Defaults to 100.
        min_point (int, optional):
            Minimum number of points required to form a cluster. Defaults to 5.
        method (Literal["DBSCAN", "HDBSCAN"], optional):
            The clustering method to use. Must be either "DBSCAN" or "HDBSCAN". Defaults to "HDBSCAN".
        service_code_column (str, optional):
            Column, containing service type for relative ratio in clasterized polygons. Defaults to "service_code".

    Returns:
        (tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]):
            A tuple containing the clustered polygons GeoDataFrame and the original points GeoDataFrame with cluster labels.
    """
    if method not in ["DBSCAN", "HDBSCAN"]:
        raise ValueError("Method must be either 'DBSCAN' or 'HDBSCAN'")
    original_crs = points.crs
    local_crs = points.estimate_utm_crs()
    points = points.to_crs(local_crs)
    services_select = _get_cluster(points, min_dist, min_point, method)

    if service_code_column not in points.columns:
        logger.warning(
            f"No {service_code_column} column in provided GeoDataFrame, cluster polygons will be without relative ratio"
        )
        points[service_code_column] = service_code_column

    points_normal = services_select[services_select["cluster"] != -1].copy()
    points_outlier = services_select[services_select["cluster"] == -1].copy()

    if len(points_normal) > 0:
        cluster_service = points_normal.groupby("cluster", group_keys=True).apply(
            _get_service_ratio, service_code_column=service_code_column
        )
        if isinstance(cluster_service, pd.Series):
            cluster_service = cluster_service.unstack(level=1, fill_value=0)

        polygons_normal = points_normal.dissolve("cluster").concave_hull(ratio=0.1, allow_holes=True)
        df_clusters_normal = pd.concat([cluster_service, polygons_normal.rename("geometry")], axis=1)
        cluster_normal = df_clusters_normal.index.max()
        points_normal["outlier"] = False
        df_clusters_normal["outlier"] = False
    else:
        df_clusters_normal = None
        cluster_normal = 0

    if len(points_outlier) > 0:
        clusters_outlier = cluster_normal + 1
        new_clusters = list(range(clusters_outlier, clusters_outlier + len(points_outlier)))
        points_outlier.loc[:, "cluster"] = new_clusters

        cluster_service = points_outlier.groupby("cluster", group_keys=True).apply(
            _get_service_ratio, service_code_column=service_code_column
        )
        if isinstance(cluster_service, pd.Series):
            cluster_service = cluster_service.unstack(level=1, fill_value=0)

        df_clusters_outlier = cluster_service.join(points_outlier.set_index("cluster")["geometry"])
        points_outlier["outlier"] = True
        df_clusters_outlier["outlier"] = True
    else:
        points_outlier = None
        df_clusters_outlier = None

    df_clusters = pd.concat([df_clusters_normal, df_clusters_outlier]).fillna(0).set_geometry("geometry")
    df_clusters["geometry"] = df_clusters["geometry"].buffer(min_dist / 2)
    df_clusters = df_clusters.reset_index().rename(columns={"index": "cluster"})

    points = pd.concat([points_normal, points_outlier])

    return df_clusters.to_crs(original_crs), points.to_crs(original_crs)

get_graph_coverage(gdf_to: gpd.GeoDataFrame, nx_graph: nx.Graph, weight_type: Literal['time_min', 'length_meter'], weight_value_cutoff: float = None, zone: gpd.GeoDataFrame = None)

Calculate coverage zones from source points through a graph network using Dijkstra's algorithm and Voronoi diagrams.

The function works by
  1. Finding nearest graph nodes for each input point
  2. Calculating all reachable nodes within cutoff distance using Dijkstra
  3. Creating Voronoi polygons around graph nodes
  4. Combining reachability information with Voronoi cells
  5. Clipping results to specified zone boundary

Parameters:

Name Type Description Default
gdf_to GeoDataFrame

Source points to which coverage is calculated.

required
nx_graph Graph

NetworkX graph representing the transportation network.

required
weight_type Literal['time_min', 'length_meter']

Edge attribute to use as weight for path calculations.

required
weight_value_cutoff float

Maximum weight value for path calculations (e.g., max travel time/distance).

None
zone GeoDataFrame

Boundary polygon to clip the resulting coverage zones. If None, concave hull of reachable nodes will be used.

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with coverage zones polygons, each associated with its source point, returns in the same CRS as original gdf_from.

Notes
  • The graph must have a valid CRS attribute in its graph properties
  • MultiGraph/MultiDiGraph inputs will be converted to simple Graph/DiGraph
Source code in src\objectnat\methods\coverage_zones\graph_coverage.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def get_graph_coverage(
    gdf_to: gpd.GeoDataFrame,
    nx_graph: nx.Graph,
    weight_type: Literal["time_min", "length_meter"],
    weight_value_cutoff: float = None,
    zone: gpd.GeoDataFrame = None,
):
    """
    Calculate coverage zones from source points through a graph network using Dijkstra's algorithm
    and Voronoi diagrams.

    The function works by:
        1. Finding nearest graph nodes for each input point
        2. Calculating all reachable nodes within cutoff distance using Dijkstra
        3. Creating Voronoi polygons around graph nodes
        4. Combining reachability information with Voronoi cells
        5. Clipping results to specified zone boundary

    Parameters:
        gdf_to (gpd.GeoDataFrame):
            Source points to which coverage is calculated.
        nx_graph (nx.Graph):
            NetworkX graph representing the transportation network.
        weight_type (Literal["time_min", "length_meter"]):
            Edge attribute to use as weight for path calculations.
        weight_value_cutoff (float):
            Maximum weight value for path calculations (e.g., max travel time/distance).
        zone (gpd.GeoDataFrame):
            Boundary polygon to clip the resulting coverage zones. If None, concave hull of reachable nodes will be used.

    Returns:
        (gpd.GeoDataFrame):
            GeoDataFrame with coverage zones polygons, each associated with its source point, returns in the same CRS
            as original gdf_from.

    Notes:
        - The graph must have a valid CRS attribute in its graph properties
        - MultiGraph/MultiDiGraph inputs will be converted to simple Graph/DiGraph
    """
    original_crs = gdf_to.crs
    try:
        local_crs = nx_graph.graph["crs"]
    except KeyError as exc:
        raise ValueError("Graph does not have crs attribute") from exc

    try:
        points = gdf_to.copy()
        points.to_crs(local_crs, inplace=True)
    except CRSError as e:
        raise CRSError(f"Graph crs ({local_crs}) has invalid format.") from e

    nx_graph, reversed_graph = reverse_graph(nx_graph, weight_type)

    points.geometry = points.representative_point()

    _, nearest_nodes = get_closest_nodes_from_gdf(points, nx_graph)

    points["nearest_node"] = nearest_nodes

    nearest_paths = nx.multi_source_dijkstra_path(
        reversed_graph, nearest_nodes, weight=weight_type, cutoff=weight_value_cutoff
    )
    reachable_nodes = list(nearest_paths.keys())
    graph_points = pd.DataFrame(
        data=[{"node": node, "geometry": Point(data["x"], data["y"])} for node, data in nx_graph.nodes(data=True)]
    ).set_index("node")
    nearest_nodes = pd.DataFrame(
        data=[path[0] for path in nearest_paths.values()], index=reachable_nodes, columns=["node_to"]
    )
    graph_nodes_gdf = gpd.GeoDataFrame(
        graph_points.merge(nearest_nodes, left_index=True, right_index=True, how="left"),
        geometry="geometry",
        crs=local_crs,
    )
    graph_nodes_gdf["node_to"] = graph_nodes_gdf["node_to"].fillna("non_reachable")
    voronois = gpd.GeoDataFrame(geometry=graph_nodes_gdf.voronoi_polygons(), crs=local_crs)
    graph_nodes_gdf = graph_nodes_gdf[graph_nodes_gdf["node_to"] != "non_reachable"]
    zone_coverages = voronois.sjoin(graph_nodes_gdf).dissolve(by="node_to").reset_index().drop(columns=["node"])
    zone_coverages = zone_coverages.merge(
        points.drop(columns="geometry"), left_on="node_to", right_on="nearest_node", how="inner"
    ).reset_index(drop=True)
    zone_coverages.drop(columns=["node_to", "nearest_node"], inplace=True)
    if zone is None:
        zone = concave_hull(graph_nodes_gdf[~graph_nodes_gdf["node_to"].isna()].union_all(), ratio=0.5)
    else:
        zone = zone.to_crs(local_crs)
    return zone_coverages.clip(zone).to_crs(original_crs)

get_radius_coverage(gdf_from: gpd.GeoDataFrame, radius: float, resolution: int = 32)

Calculate radius-based coverage zones using Voronoi polygons.

Parameters:

Name Type Description Default
gdf_from GeoDataFrame

Source points for which coverage zones are calculated.

required
radius float

Maximum coverage radius in meters.

required
resolution int

Number of segments used to approximate quarter-circle in buffer (default=32).

32

Returns:

Type Description
GeoDataFrame

GeoDataFrame with smoothed coverage zone polygons in the same CRS as original gdf_from.

Notes
  • Automatically converts to local UTM CRS for accurate distance measurements
  • Final zones are slightly contracted then expanded for smoothing effect
Source code in src\objectnat\methods\coverage_zones\radius_voronoi_coverage.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def get_radius_coverage(gdf_from: gpd.GeoDataFrame, radius: float, resolution: int = 32):
    """
    Calculate radius-based coverage zones using Voronoi polygons.

    Parameters:
        gdf_from (gpd.GeoDataFrame):
            Source points for which coverage zones are calculated.
        radius (float):
            Maximum coverage radius in meters.
        resolution (int):
            Number of segments used to approximate quarter-circle in buffer (default=32).

    Returns:
        (gpd.GeoDataFrame):
            GeoDataFrame with smoothed coverage zone polygons in the same CRS as original gdf_from.

    Notes:
        - Automatically converts to local UTM CRS for accurate distance measurements
        - Final zones are slightly contracted then expanded for smoothing effect
    """
    original_crs = gdf_from.crs
    local_crs = gdf_from.estimate_utm_crs()
    gdf_from = gdf_from.to_crs(local_crs)
    bounds = gdf_from.buffer(radius).union_all()
    coverage_polys = gpd.GeoDataFrame(geometry=gdf_from.voronoi_polygons().clip(bounds, keep_geom_type=True))
    coverage_polys = coverage_polys.sjoin(gdf_from)
    coverage_polys["area"] = coverage_polys.area
    coverage_polys["buffer"] = np.pow(coverage_polys["area"], 1 / 3)
    coverage_polys.geometry = coverage_polys.buffer(-coverage_polys["buffer"], resolution=1, join_style="mitre").buffer(
        coverage_polys["buffer"] * 0.9, resolution=resolution
    )
    coverage_polys.drop(columns=["buffer", "area"], inplace=True)
    return coverage_polys.to_crs(original_crs)

get_service_provision(buildings: gpd.GeoDataFrame, adjacency_matrix: pd.DataFrame, services: gpd.GeoDataFrame, threshold: int, buildings_demand_column: str = 'demand', services_capacity_column: str = 'capacity') -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame, gpd.GeoDataFrame]

Calculate load from buildings with demands on the given services using the distances matrix between them.

Parameters:

Name Type Description Default
services GeoDataFrame

GeoDataFrame of services

required
adjacency_matrix DataFrame

DataFrame representing the adjacency matrix

required
buildings GeoDataFrame

GeoDataFrame of demanded buildings

required
threshold int

Threshold value

required
buildings_demand_column str

column name of buildings demands

'demand'
services_capacity_column str

column name of services capacity

'capacity'

Returns:

Type Description
Tuple[GeoDataFrame, GeoDataFrame, GeoDataFrame]

Tuple of GeoDataFrames representing provision

GeoDataFrame

buildings, provision services, and provision links

Source code in src\objectnat\methods\provision\provision.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def get_service_provision(
    buildings: gpd.GeoDataFrame,
    adjacency_matrix: pd.DataFrame,
    services: gpd.GeoDataFrame,
    threshold: int,
    buildings_demand_column: str = "demand",
    services_capacity_column: str = "capacity",
) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame, gpd.GeoDataFrame]:
    """Calculate load from buildings with demands on the given services using the distances matrix between them.

    Parameters:
        services (gpd.GeoDataFrame):
            GeoDataFrame of services
        adjacency_matrix (pd.DataFrame):
            DataFrame representing the adjacency matrix
        buildings (gpd.GeoDataFrame):
            GeoDataFrame of demanded buildings
        threshold (int):
            Threshold value
        buildings_demand_column (str):
            column name of buildings demands
        services_capacity_column (str):
            column name of services capacity

    Returns:
        (Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame, gpd.GeoDataFrame]): Tuple of GeoDataFrames representing provision
        buildings, provision services, and provision links
    """
    buildings = buildings.copy()
    services = services.copy()
    adjacency_matrix = adjacency_matrix.copy()
    buildings["demand"] = buildings[buildings_demand_column]
    services["capacity"] = services[services_capacity_column]

    provision_buildings, provision_services, provision_links = Provision(
        services=services,
        demanded_buildings=buildings,
        adjacency_matrix=adjacency_matrix,
        threshold=threshold,
    ).run()
    return provision_buildings, provision_services, provision_links

get_stepped_graph_coverage(gdf_to: gpd.GeoDataFrame, nx_graph: nx.Graph, weight_type: Literal['time_min', 'length_meter'], step_type: Literal['voronoi', 'separate'], weight_value_cutoff: float = None, zone: gpd.GeoDataFrame = None, step: float = None)

Calculate stepped coverage zones from source points through a graph network using Dijkstra's algorithm and Voronoi-based or buffer-based isochrone steps.

This function combines graph-based accessibility with stepped isochrone logic. It: 1. Finds nearest graph nodes for each input point 2. Computes reachability for increasing weights (e.g. time or distance) in defined steps 3. Generates Voronoi-based or separate buffer zones around network nodes 4. Aggregates zones into stepped coverage layers 5. Optionally clips results to a boundary zone

Parameters:

Name Type Description Default
gdf_to GeoDataFrame

Source points from which stepped coverage is calculated.

required
nx_graph Graph

NetworkX graph representing the transportation network.

required
weight_type Literal['time_min', 'length_meter']

Type of edge weight to use for path calculation: - "time_min": Edge travel time in minutes - "length_meter": Edge length in meters

required
step_type Literal['voronoi', 'separate']

Method for generating stepped zones: - "voronoi": Stepped zones based on Voronoi polygons around graph nodes - "separate": Independent buffer zones per step

required
weight_value_cutoff float

Maximum weight value (e.g., max travel time or distance) to limit the coverage extent.

None
zone GeoDataFrame

Optional boundary polygon to clip resulting stepped zones. If None, concave hull of reachable area is used.

None
step float

Step interval for coverage zone construction. Defaults to: - 100 meters for distance-based weight - 1 minute for time-based weight

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with polygons representing stepped coverage zones for each input point, annotated by step range.

Notes
  • Input graph must have a valid CRS defined.
  • MultiGraph or MultiDiGraph inputs will be simplified to Graph/DiGraph.
  • Designed for accessibility and spatial equity analyses over multimodal networks.
Source code in src\objectnat\methods\coverage_zones\stepped_coverage.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_stepped_graph_coverage(
    gdf_to: gpd.GeoDataFrame,
    nx_graph: nx.Graph,
    weight_type: Literal["time_min", "length_meter"],
    step_type: Literal["voronoi", "separate"],
    weight_value_cutoff: float = None,
    zone: gpd.GeoDataFrame = None,
    step: float = None,
):
    """
    Calculate stepped coverage zones from source points through a graph network using Dijkstra's algorithm
    and Voronoi-based or buffer-based isochrone steps.

    This function combines graph-based accessibility with stepped isochrone logic. It:
        1. Finds nearest graph nodes for each input point
        2. Computes reachability for increasing weights (e.g. time or distance) in defined steps
        3. Generates Voronoi-based or separate buffer zones around network nodes
        4. Aggregates zones into stepped coverage layers
        5. Optionally clips results to a boundary zone

    Parameters:
        gdf_to (gpd.GeoDataFrame):
            Source points from which stepped coverage is calculated.
        nx_graph (nx.Graph):
            NetworkX graph representing the transportation network.
        weight_type (Literal["time_min", "length_meter"]):
            Type of edge weight to use for path calculation:
            - "time_min": Edge travel time in minutes
            - "length_meter": Edge length in meters
        step_type (Literal["voronoi", "separate"]):
            Method for generating stepped zones:
            - "voronoi": Stepped zones based on Voronoi polygons around graph nodes
            - "separate": Independent buffer zones per step
        weight_value_cutoff (float, optional):
            Maximum weight value (e.g., max travel time or distance) to limit the coverage extent.
        zone (gpd.GeoDataFrame, optional):
            Optional boundary polygon to clip resulting stepped zones. If None, concave hull of reachable area is used.
        step (float, optional):
            Step interval for coverage zone construction. Defaults to:
                - 100 meters for distance-based weight
                - 1 minute for time-based weight

    Returns:
        (gpd.GeoDataFrame): GeoDataFrame with polygons representing stepped coverage zones for each input point,
            annotated by step range.

    Notes:
        - Input graph must have a valid CRS defined.
        - MultiGraph or MultiDiGraph inputs will be simplified to Graph/DiGraph.
        - Designed for accessibility and spatial equity analyses over multimodal networks.
    """
    if step is None:
        if weight_type == "length_meter":
            step = 100
        else:
            step = 1
    original_crs = gdf_to.crs
    try:
        local_crs = nx_graph.graph["crs"]
    except KeyError as exc:
        raise ValueError("Graph does not have crs attribute") from exc

    try:
        points = gdf_to.copy()
        points.to_crs(local_crs, inplace=True)
    except CRSError as e:
        raise CRSError(f"Graph crs ({local_crs}) has invalid format.") from e

    nx_graph, reversed_graph = reverse_graph(nx_graph, weight_type)

    points.geometry = points.representative_point()

    distances, nearest_nodes = get_closest_nodes_from_gdf(points, nx_graph)

    points["nearest_node"] = nearest_nodes
    points["distance"] = distances

    dist = nx.multi_source_dijkstra_path_length(
        reversed_graph, nearest_nodes, weight=weight_type, cutoff=weight_value_cutoff
    )

    graph_points = pd.DataFrame(
        data=[{"node": node, "geometry": Point(data["x"], data["y"])} for node, data in nx_graph.nodes(data=True)]
    )

    nearest_nodes = pd.DataFrame.from_dict(dist, orient="index", columns=["dist"]).reset_index()

    graph_nodes_gdf = gpd.GeoDataFrame(
        graph_points.merge(nearest_nodes, left_on="node", right_on="index", how="left").reset_index(drop=True),
        geometry="geometry",
        crs=local_crs,
    )
    graph_nodes_gdf.drop(columns=["index", "node"], inplace=True)
    if weight_value_cutoff is None:
        weight_value_cutoff = max(nearest_nodes["dist"])
    if step_type == "voronoi":
        graph_nodes_gdf["dist"] = np.minimum(np.ceil(graph_nodes_gdf["dist"] / step) * step, weight_value_cutoff)
        voronois = gpd.GeoDataFrame(geometry=graph_nodes_gdf.voronoi_polygons(), crs=local_crs)
        zone_coverages = voronois.sjoin(graph_nodes_gdf).dissolve(by="dist", as_index=False, dropna=False)
        zone_coverages = zone_coverages[["dist", "geometry"]].explode(ignore_index=True)
        if zone is None:
            zone = concave_hull(graph_nodes_gdf[~graph_nodes_gdf["node_to"].isna()].union_all(), ratio=0.5)
        else:
            zone = zone.to_crs(local_crs)
        zone_coverages = zone_coverages.clip(zone).to_crs(original_crs)
    else:  # step_type == 'separate':
        speed = 83.33  # TODO HARDCODED WALK SPEED
        weight_value = weight_value_cutoff
        zone_coverages = create_separated_dist_polygons(graph_nodes_gdf, weight_value, weight_type, step, speed)
        if zone is not None:
            zone = zone.to_crs(local_crs)
            zone_coverages = zone_coverages.clip(zone).to_crs(original_crs)
    return zone_coverages

get_visibilities_from_points(points: gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance: int, sectors_n=None, max_workers: int = cpu_count()) -> list[Polygon]

Calculate visibility polygons from a set of points considering obstacles within a specified view distance.

Parameters:

Name Type Description Default
points GeoDataFrame

GeoDataFrame containing the points from which visibility is calculated.

required
obstacles GeoDataFrame

GeoDataFrame containing the obstacles that block visibility.

required
view_distance int

The maximum distance from each point within which visibility is calculated.

required
sectors_n int

Number of sectors to divide the view into for more detailed visibility calculations. Defaults to None.

None
max_workers int

Maximum workers in multiproccesing, multipocessing.cpu_count() by default.

cpu_count()

Returns:

Type Description
list[Polygon]

A list of visibility polygons for each input point.

Notes

This function uses get_visibility_accurate() in multiprocessing way.

Source code in src\objectnat\methods\visibility\visibility_analysis.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def get_visibilities_from_points(
    points: gpd.GeoDataFrame,
    obstacles: gpd.GeoDataFrame,
    view_distance: int,
    sectors_n=None,
    max_workers: int = cpu_count(),
) -> list[Polygon]:
    """
    Calculate visibility polygons from a set of points considering obstacles within a specified view distance.

    Parameters:
        points (gpd.GeoDataFrame):
            GeoDataFrame containing the points from which visibility is calculated.
        obstacles (gpd.GeoDataFrame):
            GeoDataFrame containing the obstacles that block visibility.
        view_distance (int):
            The maximum distance from each point within which visibility is calculated.
        sectors_n (int, optional):
            Number of sectors to divide the view into for more detailed visibility calculations. Defaults to None.
        max_workers (int, optional):
            Maximum workers in multiproccesing, multipocessing.cpu_count() by default.

    Returns:
        (list[Polygon]):
            A list of visibility polygons for each input point.

    Notes:
        This function uses `get_visibility_accurate()` in multiprocessing way.

    """
    if points.crs != obstacles.crs:
        raise ValueError(f"CRS mismatch, points crs:{points.crs} != obstacles crs:{obstacles.crs}")
    if points.crs.is_geographic:
        logger.warning("Points crs is geographic, it may produce invalid results")
    # remove points inside polygons
    joined = gpd.sjoin(points, obstacles, how="left", predicate="intersects")
    points = joined[joined.index_right.isnull()]

    # remove unused obstacles
    points_view = points.geometry.buffer(view_distance).union_all()
    s = obstacles.intersects(points_view)
    buildings_in_buffer = obstacles.loc[s[s].index].reset_index(drop=True)

    buildings_in_buffer.geometry = buildings_in_buffer.geometry.apply(
        lambda geom: MultiPolygon([geom]) if isinstance(geom, Polygon) else geom
    )
    args = [(point, buildings_in_buffer, view_distance, sectors_n) for point in points.geometry]
    all_visions = process_map(
        _multiprocess_get_vis,
        args,
        chunksize=5,
        desc="Calculating Visibility Catchment Area from each Point, it might take a while for a "
        "big amount of points",
        max_workers=max_workers,
    )

    # could return sectorized visions if sectors_n is set
    return all_visions

get_visibility(point_from: Point | gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance: float, resolution: int = 32) -> Polygon | gpd.GeoDataFrame

Function to get a quick estimate of visibility from a given point to buildings within a given distance.

Parameters:

Name Type Description Default
point_from Point | GeoDataFrame

The point or GeoDataFrame with 1 point from which the line of sight is drawn. If Point is provided it should be in the same crs as obstacles.

required
obstacles GeoDataFrame

A GeoDataFrame containing the geometry of the buildings.

required
view_distance float

The distance of view from the point.

required
resolution int)

Buffer resolution for more accuracy (may give result slower)

32

Returns:

Type Description
Polygon | GeoDataFrame

A polygon representing the area of visibility from the given point. if point_from was a GeoDataFrame, return GeoDataFrame with one feature, else Polygon.

Notes

This function provides a quicker but less accurate result compared to get_visibility_accurate(). If accuracy is important, consider using get_visibility_accurate() instead.

Source code in src\objectnat\methods\visibility\visibility_analysis.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def get_visibility(
    point_from: Point | gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance: float, resolution: int = 32
) -> Polygon | gpd.GeoDataFrame:
    """
    Function to get a quick estimate of visibility from a given point to buildings within a given distance.

    Parameters:
        point_from (Point | gpd.GeoDataFrame):
            The point or GeoDataFrame with 1 point from which the line of sight is drawn.
            If Point is provided it should be in the same crs as obstacles.
        obstacles (gpd.GeoDataFrame):
            A GeoDataFrame containing the geometry of the buildings.
        view_distance (float):
            The distance of view from the point.
        resolution (int) :
            Buffer resolution for more accuracy (may give result slower)

    Returns:
        (Polygon | gpd.GeoDataFrame):
            A polygon representing the area of visibility from the given point.
            if point_from was a GeoDataFrame, return GeoDataFrame with one feature, else Polygon.

    Notes:
        This function provides a quicker but less accurate result compared to `get_visibility_accurate()`.
        If accuracy is important, consider using `get_visibility_accurate()` instead.
    """
    return_gdf = False
    if isinstance(point_from, gpd.GeoDataFrame):
        original_crs = point_from.crs
        return_gdf = True
        if len(obstacles) > 0:
            local_crs = obstacles.estimate_utm_crs()
        else:
            local_crs = point_from.estimate_utm_crs()
        obstacles = obstacles.to_crs(local_crs)
        point_from = point_from.to_crs(local_crs)
        if len(point_from) > 1:
            logger.warning(
                f"This method processes only single point. The GeoDataFrame contains {len(point_from)} points - "
                "only the first geometry will be used for isochrone calculation. "
            )
        point_from = point_from.iloc[0].geometry
    else:
        obstacles = obstacles.copy()
    point_buffer = point_from.buffer(view_distance, resolution=resolution)
    s = obstacles.intersects(point_buffer)
    buildings_in_buffer = obstacles.loc[s[s].index].reset_index(drop=True)
    buffer_exterior_ = list(point_buffer.exterior.coords)
    line_geometry = [LineString([point_from, ext]) for ext in buffer_exterior_]
    buffer_lines_gdf = gpd.GeoDataFrame(geometry=line_geometry)
    united_buildings = buildings_in_buffer.union_all()
    if united_buildings:
        splited_lines = buffer_lines_gdf["geometry"].apply(lambda x: x.difference(united_buildings))
    else:
        splited_lines = buffer_lines_gdf["geometry"]

    splited_lines_gdf = gpd.GeoDataFrame(geometry=splited_lines).explode(index_parts=True)
    splited_lines_list = []

    for _, v in splited_lines_gdf.groupby(level=0):
        splited_lines_list.append(v.iloc[0]["geometry"].coords[-1])
    circuit = Polygon(splited_lines_list)
    if united_buildings:
        circuit = circuit.difference(united_buildings)

    if return_gdf:
        circuit = gpd.GeoDataFrame(geometry=[circuit], crs=local_crs).to_crs(original_crs)
    return circuit

get_visibility_accurate(point_from: Point | gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance, return_max_view_dist=False) -> Polygon | gpd.GeoDataFrame | tuple[Polygon | gpd.GeoDataFrame, float]

Function to get accurate visibility from a given point to buildings within a given distance.

Parameters:

Name Type Description Default
point_from Point | GeoDataFrame

The point or GeoDataFrame with 1 point from which the line of sight is drawn. If Point is provided it should be in the same crs as obstacles.

required
obstacles GeoDataFrame

A GeoDataFrame containing the geometry of the obstacles.

required
view_distance float

The distance of view from the point.

required
return_max_view_dist bool

If True, the max view distance is returned with view polygon in tuple.

False

Returns:

Type Description
Polygon | GeoDataFrame | tuple[Polygon | GeoDataFrame, float]

A polygon representing the area of visibility from the given point or polygon with max view distance. if point_from was a GeoDataFrame, return GeoDataFrame with one feature, else Polygon.

Notes

If a quick result is important, consider using the get_visibility() function instead. However, please note that get_visibility() may provide less accurate results.

Source code in src\objectnat\methods\visibility\visibility_analysis.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def get_visibility_accurate(
    point_from: Point | gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance, return_max_view_dist=False
) -> Polygon | gpd.GeoDataFrame | tuple[Polygon | gpd.GeoDataFrame, float]:
    """
    Function to get accurate visibility from a given point to buildings within a given distance.

    Parameters:
        point_from (Point | gpd.GeoDataFrame):
            The point or GeoDataFrame with 1 point from which the line of sight is drawn.
            If Point is provided it should be in the same crs as obstacles.
        obstacles (gpd.GeoDataFrame):
            A GeoDataFrame containing the geometry of the obstacles.
        view_distance (float):
            The distance of view from the point.
        return_max_view_dist (bool):
            If True, the max view distance is returned with view polygon in tuple.

    Returns:
        (Polygon | gpd.GeoDataFrame | tuple[Polygon | gpd.GeoDataFrame, float]):
            A polygon representing the area of visibility from the given point or polygon with max view distance.
            if point_from was a GeoDataFrame, return GeoDataFrame with one feature, else Polygon.

    Notes:
        If a quick result is important, consider using the `get_visibility()` function instead.
        However, please note that `get_visibility()` may provide less accurate results.
    """

    def find_furthest_point(point_from, view_polygon):
        try:
            res = round(max(Point(coords).distance(point_from) for coords in view_polygon.exterior.coords), 1)
        except Exception as e:
            print(view_polygon)
            raise e
        return res

    local_crs = None
    original_crs = None
    return_gdf = False
    if isinstance(point_from, gpd.GeoDataFrame):
        original_crs = point_from.crs
        return_gdf = True
        if len(obstacles) > 0:
            local_crs = obstacles.estimate_utm_crs()
        else:
            local_crs = point_from.estimate_utm_crs()
        obstacles = obstacles.to_crs(local_crs)
        point_from = point_from.to_crs(local_crs)
        if len(point_from) > 1:
            logger.warning(
                f"This method processes only single point. The GeoDataFrame contains {len(point_from)} points - "
                "only the first geometry will be used for isochrone calculation. "
            )
        point_from = point_from.iloc[0].geometry
    else:
        obstacles = obstacles.copy()
    if obstacles.contains(point_from).any():
        return Polygon()
    obstacles.reset_index(inplace=True, drop=True)
    point_buffer = point_from.buffer(view_distance, resolution=32)
    allowed_geom_types = ["MultiPolygon", "Polygon", "LineString", "MultiLineString"]
    obstacles = obstacles[obstacles.geom_type.isin(allowed_geom_types)]
    s = obstacles.intersects(point_buffer)
    obstacles_in_buffer = obstacles.loc[s[s].index].geometry

    buildings_lines_in_buffer = gpd.GeoSeries(
        pd.Series(
            obstacles_in_buffer.apply(polygons_to_multilinestring).explode(index_parts=False).apply(explode_linestring)
        ).explode()
    )

    buildings_lines_in_buffer = buildings_lines_in_buffer.loc[buildings_lines_in_buffer.intersects(point_buffer)]

    buildings_in_buffer_points = gpd.GeoSeries(
        [Point(line.coords[0]) for line in buildings_lines_in_buffer.geometry]
        + [Point(line.coords[-1]) for line in buildings_lines_in_buffer.geometry]
    )

    max_dist = max(view_distance, buildings_in_buffer_points.distance(point_from).max())
    polygons = []
    buildings_lines_in_buffer = gpd.GeoDataFrame(geometry=buildings_lines_in_buffer, crs=obstacles.crs).reset_index()
    logger.debug("Calculation vis polygon")
    while not buildings_lines_in_buffer.empty:
        gdf_sindex = buildings_lines_in_buffer.sindex
        # TODO check if 2 walls are nearest and use the widest angle between points
        nearest_wall_sind = gdf_sindex.nearest(point_from, return_all=False, max_distance=max_dist)
        nearest_wall = buildings_lines_in_buffer.loc[nearest_wall_sind[1]].iloc[0]
        wall_points = [Point(coords) for coords in nearest_wall.geometry.coords]

        # Calculate angles and sort by angle
        points_with_angle = sorted(
            [(pt, math.atan2(pt.y - point_from.y, pt.x - point_from.x)) for pt in wall_points], key=lambda x: x[1]
        )
        delta_angle = 2 * math.pi + points_with_angle[0][1] - points_with_angle[-1][1]
        if round(delta_angle, 10) == round(math.pi, 10):
            wall_b_centroid = obstacles_in_buffer.loc[nearest_wall["index"]].centroid
            p1 = get_point_from_a_thorough_b(point_from, points_with_angle[0][0], max_dist)
            p2 = get_point_from_a_thorough_b(point_from, points_with_angle[1][0], max_dist)
            polygon = LineString([p1, p2])
            polygon = polygon.buffer(
                distance=max_dist * point_side_of_line(polygon, wall_b_centroid), single_sided=True
            )
        else:
            if delta_angle > math.pi:
                delta_angle = 2 * math.pi - delta_angle
            a = math.sqrt((max_dist**2) * (1 + (math.tan(delta_angle / 2) ** 2)))
            p1 = get_point_from_a_thorough_b(point_from, points_with_angle[0][0], a)
            p2 = get_point_from_a_thorough_b(point_from, points_with_angle[-1][0], a)
            polygon = Polygon([points_with_angle[0][0], p1, p2, points_with_angle[1][0]])

        polygons.append(polygon)
        buildings_lines_in_buffer.drop(nearest_wall_sind[1], inplace=True)

        if not polygon.is_valid or polygon.area < 1:
            buildings_lines_in_buffer.reset_index(drop=True, inplace=True)
            continue

        lines_to_kick = buildings_lines_in_buffer.within(polygon)
        buildings_lines_in_buffer = buildings_lines_in_buffer.loc[~lines_to_kick]
        buildings_lines_in_buffer.reset_index(drop=True, inplace=True)
    logger.debug("Done calculating!")
    res = point_buffer.difference(unary_union(polygons + obstacles_in_buffer.to_list()))

    if isinstance(res, MultiPolygon):
        res = list(res.geoms)
        for polygon in res:
            if polygon.intersects(point_from):
                res = polygon
                break

    if return_gdf:
        res = gpd.GeoDataFrame(geometry=[res], crs=local_crs).to_crs(original_crs)

    if return_max_view_dist:
        return res, find_furthest_point(point_from, res)
    return res

graph_to_gdf(graph: nx.MultiDiGraph | nx.Graph | nx.DiGraph, edges: bool = True, nodes: bool = True, restore_edge_geom=False) -> gpd.GeoDataFrame | tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]

Converts nx graph to gpd.GeoDataFrame as edges.

Parameters:

Name Type Description Default
graph MultiDiGraph

The graph to convert.

required
edges bool

Keep edges in GoeDataFrame.

True
nodes bool

Keep nodes in GoeDataFrame.

True
restore_edge_geom bool

if True, will try to restore edge geometry from nodes.

False

Returns: (gpd.GeoDataFrame | tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]): Graph representation in GeoDataFrame format, either nodes or nodes,or tuple of them nodes,edges.

Source code in src\objectnat\methods\utils\graph_utils.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def graph_to_gdf(
    graph: nx.MultiDiGraph | nx.Graph | nx.DiGraph, edges: bool = True, nodes: bool = True, restore_edge_geom=False
) -> gpd.GeoDataFrame | tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
    """
    Converts nx graph to gpd.GeoDataFrame as edges.

    Parameters:
        graph (nx.MultiDiGraph):
            The graph to convert.
        edges (bool):
            Keep edges in GoeDataFrame.
        nodes (bool):
            Keep nodes in GoeDataFrame.
        restore_edge_geom (bool):
            if True, will try to restore edge geometry from nodes.
    Returns:
        (gpd.GeoDataFrame | tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]):
            Graph representation in GeoDataFrame format, either nodes or nodes,or tuple of them nodes,edges.
    """
    try:
        crs = graph.graph["crs"]
    except KeyError as exc:
        raise ValueError("Graph does not have crs attribute") from exc
    if not edges and not nodes:
        raise AttributeError("Neither edges or nodes were selected")
    if nodes and not edges:
        nodes_gdf = _nodes_to_gdf(graph, crs)
        return nodes_gdf
    if not nodes and edges:
        edges_gdf = _edges_to_gdf(graph, crs)
        if restore_edge_geom:
            nodes_gdf = _nodes_to_gdf(graph, crs)
            edges_gdf = _restore_edges_geom(nodes_gdf, edges_gdf)
        return edges_gdf

    nodes_gdf = _nodes_to_gdf(graph, crs)
    edges_gdf = _edges_to_gdf(graph, crs)
    if restore_edge_geom:
        edges_gdf = _restore_edges_geom(nodes_gdf, edges_gdf)
    return nodes_gdf, edges_gdf

simulate_noise(source_points: gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, source_noise_db: float = None, geometric_mean_freq_hz: float = None, **kwargs)

Simulates noise propagation from a set of source points considering obstacles, trees, and environmental factors.

Parameters:

Name Type Description Default
source_points GeoDataFrame

A GeoDataFrame with one or more point geometries representing noise sources. Optionally, it can include 'source_noise_db' and 'geometric_mean_freq_hz' columns for per-point simulation.

required
obstacles GeoDataFrame

A GeoDataFrame representing obstacles in the environment. If a column with sound absorption coefficients is present, its name should be provided in the absorb_ratio_column argument. Missing values will be filled with the standart_absorb_ratio.

required
source_noise_db (float

Default noise level (dB) to use if not specified per-point. Decibels are logarithmic units used to measure sound intensity. A value of 20 dB represents a barely audible whisper, while 140 dB is comparable to the noise of jet engines.

None
geometric_mean_freq_hz float

Default frequency (Hz) to use if not specified per-point. This parameter influences the sound wave's propagation and scattering in the presence of trees. Lower frequencies travel longer distances than higher frequencies. It's recommended to use values between 63 Hz and 8000 Hz; values outside this range will be clamped to the nearest boundary for the sound absorption coefficient calculation.

None
Optional kwargs
  • absorb_ratio_column (str, optional): The name of the column in the obstacles GeoDataFrame that contains the sound absorption coefficients for each obstacle. Default is None. If not specified, all obstacles will have the standart_absorb_ratio.
  • standart_absorb_ratio (float, optional): The default sound absorption coefficient to use for obstacles without specified values in the absorb_ratio_column. Default is 0.05, which is a typical value for concrete walls.
  • trees (gpd.GeoDataFrame, optional): A GeoDataFrame containing trees or dense vegetation along the sound wave's path. Trees will scatter and absorb sound waves.
  • tree_resolution (int, optional): A resolution parameter for simulating tree interactions with sound waves. Recommended values are between 2 and 16, with higher values providing more accurate simulation results.
  • air_temperature (float, optional): The air temperature in degrees Celsius. The recommended range is from 0 to 30 degrees Celsius, as temperatures outside this range will be clipped. Temperature affects the sound propagation in the air.
  • target_noise_db (float, optional): The target noise level (in dB) for the simulation. Default is 40 dB. Lower values may not be relevant for further analysis, as they are near the threshold of human hearing.
  • db_sim_step (float, optional): The step size in decibels for the noise simulation. Default is 1. For more precise analysis, this can be adjusted. If the difference between source_noise_db and target_noise_db is not divisible by the step size, the function will raise an error.
  • reflection_n (int, optional): The maximum number of reflections (bounces) to simulate for each sound wave. Recommended values are between 1 and 3. Larger values will result in longer simulation times.
  • dead_area_r (float, optional): A debugging parameter that defines the radius of the "dead zone" for reflections. Points within this area will not generate reflections. This is useful to prevent the algorithm from getting stuck in corners or along building walls.
  • use_parallel (bool, optional): Whether to use ProcessPool for task distribution or not. Default is True.

Returns: (gpd.GeoDataFrame): A GeoDataFrame containing the noise simulation results, including noise levels and geometries of the affected areas. Each point's simulation results will be merged into a single GeoDataFrame.

Source code in src\objectnat\methods\noise\noise_simulation.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def simulate_noise(
    source_points: gpd.GeoDataFrame,
    obstacles: gpd.GeoDataFrame,
    source_noise_db: float = None,
    geometric_mean_freq_hz: float = None,
    **kwargs,
):
    """
    Simulates noise propagation from a set of source points considering obstacles, trees, and environmental factors.

    Parameters:
        source_points (gpd.GeoDataFrame):
            A GeoDataFrame with one or more point geometries representing noise sources.
            Optionally, it can include 'source_noise_db' and 'geometric_mean_freq_hz' columns for per-point simulation.
        obstacles (gpd.GeoDataFrame):
            A GeoDataFrame representing obstacles in the environment. If a column with sound absorption coefficients
            is present, its name should be provided in the `absorb_ratio_column` argument.
            Missing values will be filled with the `standart_absorb_ratio`.
        source_noise_db  (float, optional):
            Default noise level (dB) to use if not specified per-point. Decibels are logarithmic units used to measure
            sound intensity. A value of 20 dB represents a barely audible whisper, while 140 dB is comparable to the
            noise of jet engines.
        geometric_mean_freq_hz (float, optional):
            Default frequency (Hz) to use if not specified per-point. This parameter influences the sound wave's
            propagation and scattering in the presence of trees. Lower frequencies travel longer distances than higher
            frequencies. It's recommended to use values between 63 Hz and 8000 Hz; values outside this range will be
            clamped to the nearest boundary for the sound absorption coefficient calculation.

    Optional kwargs:
        - absorb_ratio_column (str, optional): The name of the column in the `obstacles` GeoDataFrame that contains the
            sound absorption coefficients for each obstacle. Default is None. If not specified, all obstacles will have
            the `standart_absorb_ratio`.
        - standart_absorb_ratio (float, optional): The default sound absorption coefficient to use for obstacles without
            specified values in the `absorb_ratio_column`. Default is 0.05, which is a typical value for concrete walls.
        - trees (gpd.GeoDataFrame, optional): A GeoDataFrame containing trees or dense vegetation along the sound wave's
            path. Trees will scatter and absorb sound waves.
        - tree_resolution (int, optional): A resolution parameter for simulating tree interactions with sound waves.
            Recommended values are between 2 and 16, with higher values providing more accurate simulation results.
        - air_temperature (float, optional): The air temperature in degrees Celsius. The recommended range is from 0 to
            30 degrees Celsius, as temperatures outside this range will be clipped. Temperature affects the sound
            propagation in the air.
        - target_noise_db (float, optional): The target noise level (in dB) for the simulation. Default is 40 dB.
            Lower values may not be relevant for further analysis, as they are near the threshold of human hearing.
        - db_sim_step (float, optional): The step size in decibels for the noise simulation. Default is 1. For more
            precise analysis, this can be adjusted. If the difference between `source_noise_db` and `target_noise_db`
            is not divisible by the step size, the function will raise an error.
        - reflection_n (int, optional): The maximum number of reflections (bounces) to simulate for each sound wave.
            Recommended values are between 1 and 3. Larger values will result in longer simulation times.
        - dead_area_r (float, optional): A debugging parameter that defines the radius of the "dead zone" for reflections.
            Points within this area will not generate reflections. This is useful to prevent the algorithm from getting
            stuck in corners or along building walls.
        - use_parallel (bool, optional): Whether to use ProcessPool for task distribution or not. Default is True.
    Returns:
        (gpd.GeoDataFrame): A GeoDataFrame containing the noise simulation results, including noise levels and geometries
            of the affected areas. Each point's simulation results will be merged into a single GeoDataFrame.
    """
    # Obstacles args
    absorb_ratio_column = kwargs.get("absorb_ratio_column", None)
    standart_absorb_ratio = kwargs.get("standart_absorb_ratio", 0.05)

    # Trees args
    trees = kwargs.get("trees", None)
    tree_res = kwargs.get("tree_resolution", 4)

    # Simulation conditions
    air_temperature = kwargs.get("air_temperature", 20)
    target_noise_db = kwargs.get("target_noise_db", 40)

    # Simulation params
    db_sim_step = kwargs.get("db_sim_step", 1)
    reflection_n = kwargs.get("reflection_n", 3)
    dead_area_r = kwargs.get("dead_area_r", 5)

    # Use paralleling
    use_parallel = kwargs.get("use_parallel", True)

    # Validate optional columns or default values
    use_column_db = False
    if "source_noise_db" in source_points.columns:
        if (source_points["source_noise_db"] > MAX_DB_VALUE).any():
            raise ValueError(
                f"One or more values in 'source_noise_db' column exceed the physical limit of {MAX_DB_VALUE} dB."
            )
        if source_points["source_noise_db"].isnull().any():
            raise ValueError(f"Column 'source_noise_db' contains missing (NaN) values")
        use_column_db = True

    use_column_freq = False
    if "geometric_mean_freq_hz" in source_points.columns:
        if source_points["geometric_mean_freq_hz"].isnull().any():
            raise ValueError(f"Column 'geometric_mean_freq_hz' contains missing (NaN) values")
        use_column_freq = True

    if not use_column_db:
        if source_noise_db is None:
            raise ValueError(
                "Either `source_noise_db` must be provided or the `source_points` must contain a 'source_noise_db' column."
            )
        if source_noise_db > MAX_DB_VALUE:
            raise ValueError(
                f"source_noise_db ({source_noise_db} dB) exceeds the physical limit of {MAX_DB_VALUE} dB in air."
            )

    if not use_column_freq:
        if geometric_mean_freq_hz is None:
            raise ValueError(
                "Either `geometric_mean_freq_hz` must be provided or the `source_points` must contain a 'geometric_mean_freq_hz' column."
            )
    if not use_column_db and not use_column_freq and len(source_points) > 1:
        logger.warning(
            "`source_noise_db` and `geometric_mean_freq_hz` will be used for all points. Per-point simulation parameters not found."
        )

    original_crs = source_points.crs
    source_points = source_points.copy()

    source_points = source_points.copy()
    if len(obstacles) > 0:
        obstacles = obstacles.copy()
        obstacles.geometry = obstacles.geometry.simplify(tolerance=1)
        local_crs = obstacles.estimate_utm_crs()
        obstacles.to_crs(local_crs, inplace=True)
        source_points.to_crs(local_crs, inplace=True)
    else:
        local_crs = source_points.estimate_utm_crs()
        source_points.to_crs(local_crs, inplace=True)
        source_points.reset_index(drop=True)
        source_points.geometry = source_points.centroid

    # Simplifying trees
    if trees is not None:
        trees = trees.copy()
        trees.to_crs(local_crs, inplace=True)
        trees.geometry = trees.geometry.simplify(tolerance=1)
    else:
        trees = gpd.GeoDataFrame()

    if absorb_ratio_column is None:
        obstacles["absorb_ratio"] = standart_absorb_ratio
    else:
        obstacles["absorb_ratio"] = obstacles[absorb_ratio_column].fillna(standart_absorb_ratio)
    obstacles = obstacles[["absorb_ratio", "geometry"]]

    # creating initial task and simulating for each point
    task_queue = multiprocessing.Queue()
    dead_area_dict = {}
    for ind, row in source_points.iterrows():
        source_point = row.geometry
        local_db = row["source_noise_db"] if use_column_db else source_noise_db
        local_freq = row["geometric_mean_freq_hz"] if use_column_freq else geometric_mean_freq_hz

        # calculating layer dist and db values
        dist_db = [(0, local_db)]
        cur_db = local_db - db_sim_step
        while cur_db > target_noise_db - db_sim_step:
            if cur_db - db_sim_step < target_noise_db:
                cur_db = target_noise_db
            max_dist = dist_to_target_db(local_db, cur_db, local_freq, air_temperature)
            dist_db.append((max_dist, cur_db))
            cur_db -= db_sim_step

        args = (source_point, obstacles, trees, 0, 0, dist_db)
        kwargs = {
            "reflection_n": reflection_n,
            "geometric_mean_freq_hz": local_freq,
            "tree_res": tree_res,
            "min_db": target_noise_db,
            "simulation_ind": ind,
        }
        task_queue.put((_noise_from_point_task, args, kwargs))
        dead_area_dict[ind] = source_point.buffer(dead_area_r, resolution=2)

    noise_gdf = _recursive_simulation_queue(
        task_queue, dead_area_dict=dead_area_dict, dead_area_r=dead_area_r, use_parallel=use_parallel
    )

    noise_gdf = gpd.GeoDataFrame(pd.concat(noise_gdf, ignore_index=True), crs=local_crs)
    polygons = gpd.GeoDataFrame(
        geometry=list(polygonize(noise_gdf.geometry.apply(polygons_to_multilinestring).union_all())), crs=local_crs
    )
    polygons_points = polygons.copy()
    polygons_points.geometry = polygons.representative_point()
    sim_result = polygons_points.sjoin(noise_gdf, predicate="within").reset_index()
    sim_result = sim_result.groupby("index").agg({"noise_level": "max"})
    sim_result["geometry"] = polygons
    sim_result = (
        gpd.GeoDataFrame(sim_result, geometry="geometry", crs=local_crs).dissolve(by="noise_level").reset_index()
    )

    return sim_result.to_crs(original_crs)