Source code for objectnat.methods.visibility.visibility_analysis

import math
from concurrent.futures import ProcessPoolExecutor
from typing import Literal

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely import LineString, MultiPolygon, Point, Polygon
from shapely.ops import unary_union
from tqdm.contrib.concurrent import process_map

from objectnat import config
from objectnat.methods.utils.geom_utils import (
    explode_linestring,
    get_point_from_a_thorough_b,
    point_side_of_line,
    polygons_to_multilinestring,
)

logger = config.logger
enable_tqdm = config.enable_tqdm_bar


def dist_to_furthest_point(point_from: Point, view_polygon: Polygon) -> float:
    """
    Compute the maximum distance from an observer point to the boundary of a visibility polygon.

    The function measures the Euclidean distance from the observer point to all
    vertices of the polygon's exterior ring and returns the maximum distance,
    rounded to one decimal place.

    Args:
        point_from (Point):
            Observer location in the same CRS as ``view_polygon``.
        view_polygon (Polygon):
            Polygon representing the visibility area or any polygonal geometry
            around the observer.

    Returns:
        float:
            Maximum distance from ``point_from`` to the vertices of
            ``view_polygon``, rounded to one decimal place.
    """
    try:
        coords = np.asarray(view_polygon.exterior.coords, dtype="float64")
        dx = coords[:, 0] - point_from.x
        dy = coords[:, 1] - point_from.y
        res = float(np.sqrt(dx * dx + dy * dy).max())
        return round(res, 1)
    except Exception as e:
        print(view_polygon)
        raise e


def _visibility_accurate(point_from: Point, obstacles: gpd.GeoDataFrame, view_distance: float) -> Polygon:
    """
    Compute a high-accuracy visibility polygon for a single observer point.

    This function implements a detailed line-of-sight algorithm based on obstacle
    boundaries:

    * A circular buffer with radius ``view_distance`` is built around the
      observer point.
    * Obstacles intersecting this buffer are selected.
    * Polygonal obstacles are converted to line boundaries and exploded into
      individual segments.
    * For each nearest wall segment, a visibility wedge (sector) is constructed
      using angular relationships between wall endpoints and the observer.
    * All wedges are combined and subtracted from the initial visibility buffer
      together with obstacles to obtain the final visible area.

    Compared to ``_visibility_simple()``, this method is more accurate in
    complex environments (e.g., dense buildings, narrow streets), but it is also
    significantly slower.

    Args:
        point_from (Point):
            Observer location in projected coordinates.
        obstacles (gpd.GeoDataFrame):
            GeoDataFrame with obstacle geometries (buildings, walls, etc.) in
            the same CRS as ``point_from``. Expected geometry types are
            Polygon, MultiPolygon, LineString or MultiLineString.
        view_distance (float):
            Maximum viewing radius around the observer, in units of the CRS.

    Returns:
        Polygon:
            Polygon representing the visible area around ``point_from`` within
            ``view_distance``, after subtracting obstacles. If the result is a
            MultiPolygon, the component intersecting the observer point is
            returned.

    Notes:
        It is assumed that coordinates are in a metric CRS (e.g., UTM). CRS
        management is done externally in ``get_visibility()``.
    """
    point_buffer = point_from.buffer(view_distance)
    sindex = obstacles.sindex
    idx = list(sindex.query(point_buffer, predicate="intersects"))
    if not idx:
        return point_buffer
    obstacles = obstacles.iloc[idx].reset_index(drop=True)

    buildings_lines_in_buffer = gpd.GeoSeries(
        pd.Series(
            obstacles.geometry.apply(polygons_to_multilinestring).explode(index_parts=False).apply(explode_linestring)
        ).explode()
    )

    buildings_lines_in_buffer = buildings_lines_in_buffer.loc[buildings_lines_in_buffer.intersects(point_buffer)]

    coords = [line.coords[0] for line in buildings_lines_in_buffer.geometry] + [
        line.coords[-1] for line in buildings_lines_in_buffer.geometry
    ]

    coords = np.asarray(coords, dtype="float64")
    dx = coords[:, 0] - point_from.x
    dy = coords[:, 1] - point_from.y
    max_dist_lines = float(np.sqrt(dx * dx + dy * dy).max())

    max_dist = max(view_distance, max_dist_lines)

    polygons = []
    buildings_lines_in_buffer = gpd.GeoDataFrame(geometry=buildings_lines_in_buffer, crs=obstacles.crs).reset_index()
    logger.debug("Calculation vis polygon")
    while not buildings_lines_in_buffer.empty:
        gdf_sindex = buildings_lines_in_buffer.sindex
        # TODO check if 2 walls are nearest and use the widest angle between points
        _, nearest_wall_sind = gdf_sindex.nearest(point_from, return_all=False, max_distance=max_dist)
        nearest_wall = buildings_lines_in_buffer.loc[nearest_wall_sind].iloc[0]
        wall_points = [Point(coords) for coords in nearest_wall.geometry.coords]

        # Calculate angles and sort by angle
        points_with_angle = sorted(
            [(pt, math.atan2(pt.y - point_from.y, pt.x - point_from.x)) for pt in wall_points], key=lambda x: x[1]
        )
        delta_angle = 2 * math.pi + points_with_angle[0][1] - points_with_angle[-1][1]
        if round(delta_angle, 10) == round(math.pi, 10):
            wall_b_centroid = obstacles.loc[nearest_wall["index"]].centroid
            p1 = get_point_from_a_thorough_b(point_from, points_with_angle[0][0], max_dist)
            p2 = get_point_from_a_thorough_b(point_from, points_with_angle[1][0], max_dist)
            polygon = LineString([p1, p2])
            polygon = polygon.buffer(
                distance=max_dist * point_side_of_line(polygon, wall_b_centroid), single_sided=True
            )
        else:
            if delta_angle > math.pi:
                delta_angle = 2 * math.pi - delta_angle
            a = math.sqrt((max_dist**2) * (1 + (math.tan(delta_angle / 2) ** 2)))
            p1 = get_point_from_a_thorough_b(point_from, points_with_angle[0][0], a)
            p2 = get_point_from_a_thorough_b(point_from, points_with_angle[-1][0], a)
            polygon = Polygon([points_with_angle[0][0], p1, p2, points_with_angle[1][0]])

        if not polygon.is_valid or polygon.area < 1:
            buildings_lines_in_buffer.drop(nearest_wall_sind, inplace=True)
            buildings_lines_in_buffer.reset_index(drop=True, inplace=True)
            continue
        polygons.append(polygon)
        candidate_idx = list(gdf_sindex.query(polygon, predicate="intersects"))
        if candidate_idx:
            candidates = buildings_lines_in_buffer.loc[candidate_idx]
            mask_within = candidates.within(polygon)
            to_drop = candidates.index[mask_within]
        else:
            to_drop = pd.Index([])
        to_drop = to_drop.append(pd.Index(nearest_wall_sind))
        buildings_lines_in_buffer.drop(index=to_drop, inplace=True)
        buildings_lines_in_buffer.reset_index(drop=True, inplace=True)
    logger.debug("Done calculating!")
    vis_poly = point_buffer.difference(unary_union(polygons + obstacles.geometry.to_list()))

    if isinstance(vis_poly, MultiPolygon):
        vis_poly = list(vis_poly.geoms)
        for polygon in vis_poly:
            if polygon.intersects(point_from):
                vis_poly = polygon
                break
    return vis_poly


def _visibility_simple(
    point_from: Point, obstacles: gpd.GeoDataFrame, view_distance: float, resolution: int
) -> Polygon:
    """
    Compute a fast, approximate visibility polygon for a single observer point.

    This function provides a simplified line-of-sight estimate using radial
    rays:

    * A circular buffer with radius ``view_distance`` is created around the
      observer.
    * The buffer is discretized into multiple directions using the
      ``quad_segs`` parameter (``resolution``).
    * For each direction, a line segment is drawn from the observer to the
      buffer boundary.
    * These lines are cut by the union of obstacles, removing occluded parts.
    * The endpoints of the remaining visible segments form an approximate
      visibility contour.

    Compared to ``_visibility_accurate()``, this method is much faster but
    less precise, especially in highly complex or detailed urban scenes.

    Args:
        point_from (Point):
            Observer location in projected coordinates.
        obstacles (gpd.GeoDataFrame):
            GeoDataFrame with obstacle geometries in the same CRS as
            ``point_from``.
        view_distance (float):
            Maximum viewing radius around the observer, in units of the CRS.
        resolution (int):
            Angular resolution of the buffer. Passed as ``quad_segs`` to
            ``Point.buffer()``. Higher values produce smoother and more
            detailed visibility contours but increase computation time.

    Returns:
        Polygon:
            Approximate visibility polygon from ``point_from`` within
            ``view_distance``, clipped by ``obstacles``.
    """
    point_buffer = point_from.buffer(view_distance, quad_segs=resolution)
    sindex = obstacles.sindex
    idx = list(sindex.query(point_buffer, predicate="intersects"))
    if not idx:
        return point_buffer
    obstacles = obstacles.iloc[idx].reset_index(drop=True)

    buffer_exterior_ = list(point_buffer.exterior.coords)
    line_geometry = [LineString([point_from, ext]) for ext in buffer_exterior_]
    buffer_lines_gdf = gpd.GeoDataFrame(geometry=line_geometry)
    united_buildings = obstacles.union_all()
    if united_buildings:
        splited_lines = buffer_lines_gdf["geometry"].apply(lambda x: x.difference(united_buildings))
    else:
        splited_lines = buffer_lines_gdf["geometry"]

    splited_lines_gdf = gpd.GeoDataFrame(geometry=splited_lines).explode(index_parts=True)
    splited_lines_list = []

    for _, v in splited_lines_gdf.groupby(level=0):
        splited_lines_list.append(v.iloc[0]["geometry"].coords[-1])
    circuit = Polygon(splited_lines_list)
    if united_buildings:
        circuit = circuit.difference(united_buildings)

    return circuit


def _visibility_worker(args: tuple[Point, gpd.GeoDataFrame | None, float, str, int]) -> Polygon:
    """
    Worker function for computing visibility for a single observer point.

    This helper is designed to be used with process-based parallelization
    (e.g., ``ProcessPoolExecutor`` or ``tqdm.contrib.concurrent.process_map``).
    It unpacks the argument tuple and dispatches to either the accurate or
    simple visibility algorithm.

    Args:
        args (tuple[Point, gpd.GeoDataFrame | None, float, str, int]):
            A 5-tuple containing:

            * point_geom (Point): Observer location in local projected CRS.
            * obstacles (gpd.GeoDataFrame | None): Obstacles in the same CRS.
              If ``None`` or empty, no occlusion is applied.
            * view_distance (float): Viewing radius for this particular point.
            * method (str): Visibility algorithm to use. Must be either
              ``"accurate"`` or ``"simple"``.
            * resolution (int): Resolution parameter passed to the simple
              method (ignored for the accurate method).

    Returns:
        Polygon:
            Visibility polygon for the given observer and parameters. If there
            are no obstacles, this is a circular-like buffer. If the observer
            lies inside an obstacle, an empty polygon is returned.
    """
    point_geom, obstacles, view_distance, method, resolution = args

    if obstacles is None or len(obstacles) == 0:
        point_buffer = point_geom.buffer(
            view_distance,
            quad_segs=(32 if method == "accurate" else resolution),
        )
        return point_buffer

    if obstacles.contains(point_geom).any():
        return Polygon()

    if method == "accurate":
        return _visibility_accurate(point_geom, obstacles, view_distance)
    elif method == "simple":
        return _visibility_simple(point_geom, obstacles, view_distance, resolution)
    else:
        raise ValueError("method must be one of: 'accurate', 'fast'")


[docs] def get_visibility( point_from: gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance: float | None = None, method: Literal["accurate", "simple"] = "accurate", *, resolution: int = 32, parallel: bool = False, max_workers: int | None = None, ) -> gpd.GeoDataFrame: """ Compute visibility polygons for one or many observer points. This is a high-level, batch interface to the visibility analysis: * Accepts a GeoDataFrame of observer points. * Reprojects points and obstacles to a locally estimated projected CRS (typically UTM) for distance-accurate calculations. * For each point, computes a visibility polygon using either: * an accurate wall-based algorithm (``method="accurate"``), or * a simple ray-based approximation (``method="simple"``). * Supports per-point visibility radii via a dedicated column or a global ``view_distance`` value. * Optionally parallelizes computation across points using processes and can display a tqdm progress bar. Args: point_from (gpd.GeoDataFrame): GeoDataFrame with point geometries representing observer locations. Must have a valid CRS. Any additional attributes are preserved in the output. obstacles (gpd.GeoDataFrame): GeoDataFrame with obstacle geometries in the same CRS as ``point_from``. If empty or ``None``, no occlusion is applied and visibility is limited only by the viewing radius. view_distance (float | None, optional): Global viewing radius for all points (in units of the CRS). Used when the ``"visibility_distance"`` column is not present in ``point_from``. If ``None`` and the column is also missing, a ``ValueError`` is raised. method (Literal["accurate", "simple"], optional): Visibility algorithm to use: * ``"accurate"`` – slower but more precise, based on obstacle boundaries and visibility wedges. * ``"simple"`` – faster approximation with radial rays and obstacle cutting. resolution (int, optional): Resolution parameter for the simple method. Passed as ``quad_segs`` to ``Point.buffer()``; ignored when ``method="accurate"``. parallel (bool, optional): If ``True``, compute visibility polygons for multiple points in parallel using processes. If ``False``, process points sequentially in the current process. max_workers (int | None, optional): Maximum number of worker processes when ``parallel=True``. If ``None``, the default from ``ProcessPoolExecutor`` is used. Returns: gpd.GeoDataFrame: GeoDataFrame with the same index and attributes as ``point_from``, but with the geometry column replaced by visibility polygons. The result is returned in the original CRS of ``point_from``. Raises: TypeError: If ``point_from`` is not a GeoDataFrame. ValueError: If ``point_from`` is empty, has no CRS, or neither ``view_distance`` nor the ``"visibility_distance"`` column is provided. Notes: * If a column named ``"visibility_distance"`` is present in ``point_from``, its values are used as per-point view distances and the ``view_distance`` argument is ignored. * When ``parallel=True`` and ``enable_tqdm`` is True in the global config, a progress bar is displayed using ``tqdm.contrib.concurrent.process_map`` during parallel execution. Differences between methods --------------------------- * ``method="accurate"``: Uses obstacle boundaries and angular visibility wedges. More precise, especially in dense environments and around corners, but slower. * ``method="simple"``: Uses radial rays and line splitting. Much faster and suitable for large batches or rough estimates, but may produce less detailed visibility shapes. """ if not isinstance(point_from, gpd.GeoDataFrame): raise TypeError("point_from must be a GeoDataFrame with a point geometry") if point_from.empty: raise ValueError("GeoDataFrame 'point_from' is empty.") original_crs = point_from.crs local_crs = point_from.estimate_utm_crs() points_local = point_from.to_crs(local_crs) if "visibility_distance" in points_local.columns: distances = points_local["visibility_distance"].to_list() else: if view_distance is None: raise ValueError( "Either provide parameter view_distance or add column 'visibility_distance' to point_from GeoDataFrame." ) distances = [view_distance] * len(points_local) if obstacles is not None and len(obstacles) > 0: obstacles_local = obstacles.to_crs(local_crs) else: obstacles_local = None tasks = [(geom, obstacles_local, dist, method, resolution) for geom, dist in zip(points_local.geometry, distances)] logger.info("started") if not parallel: results = [_visibility_worker(t) for t in tasks] logger.info("done seq ") else: if enable_tqdm: results = process_map( _visibility_worker, tasks, max_workers=max_workers, chunksize=1, desc="Visibility", ) else: with ProcessPoolExecutor(max_workers=max_workers) as executor: results = list(executor.map(_visibility_worker, tasks)) logger.info("done parallel") result_gdf = points_local.copy() result_gdf["geometry"] = results result_gdf.set_crs(local_crs, inplace=True) result_gdf = result_gdf.to_crs(original_crs) return result_gdf
# def get_visibilities_from_points( # points: gpd.GeoDataFrame, # obstacles: gpd.GeoDataFrame, # view_distance: int, # sectors_n=None, # max_workers: int = cpu_count(), # ) -> list[Polygon]: # """ # Calculate visibility polygons from a set of points considering obstacles within a specified view distance. # # Args: # points (gpd.GeoDataFrame): # GeoDataFrame containing the points from which visibility is calculated. # obstacles (gpd.GeoDataFrame): # GeoDataFrame containing the obstacles that block visibility. # view_distance (int): # The maximum distance from each point within which visibility is calculated. # sectors_n (int, optional): # Number of sectors to divide the view into for more detailed visibility calculations. Defaults to None. # max_workers (int, optional): # Maximum workers in multiproccesing, multipocessing.cpu_count() by default. # # Returns: # (list[Polygon]): # A list of visibility polygons for each input point. # # Notes: # This function uses `get_visibility_accurate()` in multiprocessing way. # # """ # if points.crs != obstacles.crs: # raise ValueError(f"CRS mismatch, points crs:{points.crs} != obstacles crs:{obstacles.crs}") # if points.crs.is_geographic: # logger.warning("Points crs is geographic, it may produce invalid results") # # remove points inside polygons # joined = gpd.sjoin(points, obstacles, how="left", predicate="intersects") # points = joined[joined.index_right.isnull()] # # # remove unused obstacles # points_view = points.geometry.buffer(view_distance).union_all() # s = obstacles.intersects(points_view) # buildings_in_buffer = obstacles.loc[s[s].index].reset_index(drop=True) # # buildings_in_buffer.geometry = buildings_in_buffer.geometry.apply( # lambda geom: MultiPolygon([geom]) if isinstance(geom, Polygon) else geom # ) # args = [(point, buildings_in_buffer, view_distance, sectors_n) for point in points.geometry] # all_visions = process_map( # _multiprocess_get_vis, # args, # chunksize=5, # desc="Calculating Visibility Catchment Area from each Point, it might take a while for a " # "big amount of points", # max_workers=max_workers, # ) # # # could return sectorized visions if sectors_n is set # return all_visions # def calculate_visibility_catchment_area( # points: gpd.GeoDataFrame, obstacles: gpd.GeoDataFrame, view_distance: int | float, max_workers: int = cpu_count() # ) -> gpd.GeoDataFrame: # pragma: no cover # """ # Calculate visibility catchment areas for a large urban area based on given points and obstacles. # This function is designed to work with at least 1000 points spaced 10-20 meters apart for optimal results. # Points can be generated using a road graph. # # Args: # points (gpd.GeoDataFrame): GeoDataFrame containing the points from which visibility is calculated. # obstacles (gpd.GeoDataFrame): GeoDataFrame containing the obstacles that block visibility. # view_distance (int | float): The maximum distance from each point within which visibility is calculated. # max_workers (int): Maximum workers in multiproccesing, multipocessing.cpu_count() by default. # # Returns: # (gpd.GeoDataFrame): GeoDataFrame containing the calculated visibility catchment areas. # """ # # def filter_geoms(x): # if x.geom_type == "GeometryCollection": # return MultiPolygon([y for y in x.geoms if y.geom_type in ["Polygon", "MultiPolygon"]]) # return x # # def calc_group_factor(x): # return np.mean(x.new_ratio) * x.count_n # # def unary_union_groups(x): # return unary_union(MultiPolygon(list(x["geometry"])).buffer(0)) # # raise NotImplementedError("This method is temporarily unsupported.") # # local_crs = obstacles.estimate_utm_crs() # obstacles = obstacles.to_crs(local_crs) # points = points.to_crs(local_crs) # # sectors_n = 12 # logger.info("Calculating Visibility Catchment Area from each point") # all_visions_sectorized = get_visibilities_from_points(points, obstacles, view_distance, sectors_n, max_workers) # all_visions_sectorized = gpd.GeoDataFrame( # geometry=[item for sublist in all_visions_sectorized for item in sublist], crs=local_crs # ) # logger.info("Calculating non-vision part...") # all_visions_unary = all_visions_sectorized.union_all() # convex = all_visions_unary.convex_hull # dif = convex.difference(all_visions_unary) # # del convex, all_visions_unary # # buf_area = (math.pi * view_distance**2) / sectors_n # all_visions_sectorized["ratio"] = all_visions_sectorized.area / buf_area # all_visions_sectorized["ratio"] = min_max_normalization( # all_visions_sectorized["ratio"].values, new_min=1, new_max=10 # ) # groups = all_visions_sectorized.sample(frac=1).groupby(all_visions_sectorized.index // 6000) # groups = [group for _, group in groups] # # del all_visions_sectorized # # groups_result = process_map( # _process_group, # groups, # desc="Counting intersections in each group...", # max_workers=max_workers, # ) # logger.info("Calculating all groups intersection...") # all_in = combine_geometry(gpd.GeoDataFrame(data=pd.concat(groups_result), geometry="geometry", crs=local_crs)) # # del groups_result # # all_in["count_n"] = all_in["index_right"].apply(len) # # logger.info("Calculating intersection's parameters") # # all_in["factor"] = all_in.parallel_apply(calc_group_factor, axis=1) # TODO replace pandarallel methods # threshold = all_in["factor"].quantile(0.3) # all_in = all_in[all_in["factor"] > threshold] # # all_in["factor_normalized"] = np.round( # min_max_normalization(np.sqrt(all_in["factor"].values), new_min=1, new_max=5) # ).astype(int) # logger.info("Calculating normalized groups geometry...") # all_in = ( # all_in.groupby("factor_normalized").parallel_apply(unary_union_groups).reset_index() # ) # TODO replace pandarallel methods # all_in = gpd.GeoDataFrame(data=all_in.rename(columns={0: "geometry"}), geometry="geometry", crs=32636) # # all_in = all_in.explode(index_parts=True).reset_index(drop=True) # all_in["area"] = all_in.area # threshold = all_in["area"].quantile(0.9) # all_in = all_in[all_in["area"] > threshold] # all_in = all_in.groupby("factor_normalized").apply(unary_union_groups).reset_index() # all_in = gpd.GeoDataFrame(data=all_in.rename(columns={0: "geometry"}), geometry="geometry", crs=32636) # # all_in.geometry = all_in.geometry.buffer(20).buffer(-20).difference(dif) # # all_in.sort_values(by="factor_normalized", ascending=False, inplace=True) # all_in.reset_index(drop=True, inplace=True) # logger.info("Smoothing normalized groups geometry...") # for ind, row in all_in.iloc[:-1].iterrows(): # for ind2 in range(ind + 1, len(all_in)): # current_geometry = all_in.at[ind2, "geometry"] # all_in.at[ind2, "geometry"] = current_geometry.difference(row.geometry) # all_in["geometry"] = all_in["geometry"].apply(filter_geoms) # # all_in = all_in.explode(index_parts=True) # logger.info("Done!") # return all_in # # # def _multiprocess_get_vis(args): # pragma: no cover # point, buildings, view_distance, sectors_n = args # result = get_visibility_accurate(point, buildings, view_distance) # # if sectors_n is not None: # sectors = [] # # cx, cy = point.x, point.y # # angle_increment = 2 * math.pi / sectors_n # view_distance = math.sqrt((view_distance**2) * (1 + (math.tan(angle_increment / 2) ** 2))) # for i in range(sectors_n): # angle1 = i * angle_increment # angle2 = (i + 1) * angle_increment # # x1, y1 = cx + view_distance * math.cos(angle1), cy + view_distance * math.sin(angle1) # x2, y2 = cx + view_distance * math.cos(angle2), cy + view_distance * math.sin(angle2) # # sector_triangle = Polygon([point, (x1, y1), (x2, y2)]) # sector = result.intersection(sector_triangle) # # if not sector.is_empty: # sectors.append(sector) # result = sectors # return result # # # def _process_group(group): # pragma: no cover # geom = group # combined_geometry = combine_geometry(geom) # combined_geometry.drop(columns=["index", "index_right"], inplace=True) # combined_geometry["count_n"] = combined_geometry["ratio"].apply(len) # combined_geometry["new_ratio"] = combined_geometry.apply( # lambda x: np.power(np.prod(x.ratio), 1 / x.count_n) * x.count_n, axis=1 # ) # # threshold = combined_geometry["new_ratio"].quantile(0.25) # combined_geometry = combined_geometry[combined_geometry["new_ratio"] > threshold] # # combined_geometry["new_ratio_normalized"] = min_max_normalization( # combined_geometry["new_ratio"].values, new_min=1, new_max=10 # ) # # combined_geometry["new_ratio_normalized"] = np.round(combined_geometry["new_ratio_normalized"]).astype(int) # # result_union = ( # combined_geometry.groupby("new_ratio_normalized") # .agg({"geometry": lambda x: unary_union(MultiPolygon(list(x)).buffer(0))}) # .reset_index(drop=True) # ) # result_union.set_geometry("geometry", inplace=True) # result_union.set_crs(geom.crs, inplace=True) # # result_union = result_union.explode("geometry", index_parts=False).reset_index(drop=True) # # representative_points = combined_geometry.copy() # representative_points["geometry"] = representative_points["geometry"].representative_point() # # joined = gpd.sjoin(result_union, representative_points, how="inner", predicate="contains").reset_index() # joined = joined.groupby("index").agg({"geometry": "first", "new_ratio": lambda x: np.mean(list(x))}) # # joined.set_geometry("geometry", inplace=True) # joined.set_crs(geom.crs, inplace=True) # return joined