Source code for objectnat.methods.point_clustering.cluster_points_in_polygons

from typing import Literal

import geopandas as gpd
import pandas as pd
from sklearn.cluster import DBSCAN, HDBSCAN

from objectnat import config

logger = config.logger


def _get_cluster(services_select, min_dist, min_point, method):
    services_coords = pd.DataFrame(
        {"x": services_select.geometry.representative_point().x, "y": services_select.geometry.representative_point().y}
    )
    if method == "DBSCAN":
        db = DBSCAN(eps=min_dist, min_samples=min_point).fit(services_coords.to_numpy())
    else:
        db = HDBSCAN(min_cluster_size=min_point, cluster_selection_epsilon=min_dist).fit(services_coords.to_numpy())
    services_select["cluster"] = db.labels_
    return services_select


def _get_service_ratio(loc, service_code_column):
    all_services = loc.shape[0]
    loc[service_code_column] = loc[service_code_column].astype(str)
    services_count = loc.groupby(service_code_column).size()
    return (services_count / all_services).round(2)


[docs] def get_clusters_polygon( points: gpd.GeoDataFrame, min_dist: float | int = 100, min_point: int = 5, method: Literal["DBSCAN", "HDBSCAN"] = "HDBSCAN", service_code_column: str = "service_code", ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """ Generate cluster polygons for given points based on a specified minimum distance and minimum points per cluster. Optionally, calculate the relative ratio between types of points within the clusters. Args: points (gpd.GeoDataFrame): GeoDataFrame containing the points to be clustered. Must include a 'service_code' column for service ratio calculations. min_dist (float | int, optional): Minimum distance between points to be considered part of the same cluster. Defaults to 100. min_point (int, optional): Minimum number of points required to form a cluster. Defaults to 5. method: The clustering method to use. Must be either "DBSCAN" or "HDBSCAN". Defaults to "HDBSCAN". service_code_column (str, optional): Column, containing service type for relative ratio in clasterized polygons. Defaults to "service_code". Returns: tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: A tuple containing the clustered polygons GeoDataFrame and the original points GeoDataFrame with cluster labels. """ if method not in ["DBSCAN", "HDBSCAN"]: raise ValueError("Method must be either 'DBSCAN' or 'HDBSCAN'") original_crs = points.crs local_crs = points.estimate_utm_crs() points = points.to_crs(local_crs) services_select = _get_cluster(points, min_dist, min_point, method) if service_code_column not in points.columns: logger.warning( f"No {service_code_column} column in provided GeoDataFrame, cluster polygons will be without relative ratio" ) points[service_code_column] = service_code_column points_normal = services_select[services_select["cluster"] != -1].copy() points_outlier = services_select[services_select["cluster"] == -1].copy() if len(points_normal) > 0: cluster_service = points_normal.groupby("cluster", group_keys=True).apply( _get_service_ratio, service_code_column=service_code_column ) if isinstance(cluster_service, pd.Series): cluster_service = cluster_service.unstack(level=1, fill_value=0) polygons_normal = points_normal.dissolve("cluster").concave_hull(ratio=0.1, allow_holes=True) df_clusters_normal = pd.concat([cluster_service, polygons_normal.rename("geometry")], axis=1) cluster_normal = df_clusters_normal.index.max() points_normal["outlier"] = False df_clusters_normal["outlier"] = False else: df_clusters_normal = None cluster_normal = 0 if len(points_outlier) > 0: clusters_outlier = cluster_normal + 1 new_clusters = list(range(clusters_outlier, clusters_outlier + len(points_outlier))) points_outlier.loc[:, "cluster"] = new_clusters cluster_service = points_outlier.groupby("cluster", group_keys=True).apply( _get_service_ratio, service_code_column=service_code_column ) if isinstance(cluster_service, pd.Series): cluster_service = cluster_service.unstack(level=1, fill_value=0) df_clusters_outlier = cluster_service.join(points_outlier.set_index("cluster")["geometry"]) points_outlier["outlier"] = True df_clusters_outlier["outlier"] = True else: points_outlier = None df_clusters_outlier = None df_clusters = pd.concat([df_clusters_normal, df_clusters_outlier]).fillna(0).set_geometry("geometry") df_clusters["geometry"] = df_clusters["geometry"].buffer(min_dist / 2) df_clusters = df_clusters.reset_index().rename(columns={"index": "cluster"}) points = pd.concat([points_normal, points_outlier]) return df_clusters.to_crs(original_crs), points.to_crs(original_crs)