import datetime as dt
import math
import re
import time
from collections import defaultdict
from typing import Literal
import geopandas as gpd
import pandas as pd
import requests
from shapely import LineString, MultiPolygon, Polygon, unary_union
from shapely.ops import polygonize
from iduedu import config
from iduedu.modules.overpass.overpass_cache import cache_load, cache_save_async
logger = config.logger
import threading
from email.utils import parsedate_to_datetime
from time import monotonic
class RateLimiter:
def __init__(self, min_interval: float):
self.min_interval = float(min_interval)
self._next_ts = 0.0
self._cv = threading.Condition()
def wait(self):
start = monotonic()
with self._cv:
while True:
now = monotonic()
if now >= self._next_ts:
# фиксированное расписание слотов
self._next_ts = max(self._next_ts, now) + self.min_interval
slept = monotonic() - start
# logger.info(f"GRANT; slept={slept:.3f}s; next_slot_in={self._next_ts - now:.3f}s")
self._cv.notify_all()
return
to_sleep = self._next_ts - now
# logger.info(f"SLEEP for {to_sleep:.3f}s (next_ts={self._next_ts:.3f})")
self._cv.wait(timeout=to_sleep)
OVERPASS_MIN_INTERVAL = config.overpass_min_interval
OVERPASS_RL = RateLimiter(OVERPASS_MIN_INTERVAL)
def _overpass_http(
method: Literal["GET", "POST"], url: str, *, params=None, data=None, timeout=None
) -> requests.Response:
OVERPASS_RL.wait()
headers = {"User-Agent": config.user_agent}
proxies = config.proxies
verify = config.verify_ssl
req_timeout = timeout or config.timeout
if method == "GET":
return requests.get(url, params=params, timeout=req_timeout, headers=headers, proxies=proxies, verify=verify)
else:
return requests.post(url, data=data, timeout=req_timeout, headers=headers, proxies=proxies, verify=verify)
class RequestError(RuntimeError):
"""
Basic error for eny requests problems
"""
def __init__(self, message, status_code=None, reason=None, response_text=None, response_content=None):
super().__init__(message)
self.status_code = status_code
self.reason = reason
self.response_text = response_text
self.response_content = response_content
def __str__(self): # pragma: no cover
if self.status_code == 400:
return (
f"{super().__str__()} (status: {self.status_code}, reason: {self.reason}). "
f"Make sure provided polygon is in CRS 4326."
)
return f"{super().__str__()} (status: {self.status_code}, reason: {self.reason})."
def _get_overpass_pause(
base_endpoint: str,
recursion_pause: float = 5,
) -> float:
url = base_endpoint[: -len("/interpreter")] + "/status"
try:
response = _overpass_http("GET", url, timeout=config.timeout)
response_text = response.text
except requests.RequestException as e: # pragma: no cover
raise RequestError(f"Unable to reach {url}, {e}") from e
try:
status = response_text.split("\n")[4]
status_first_part = status.split(" ")[0]
except (AttributeError, IndexError, ValueError): # pragma: no cover
raise RequestError(f"Unable to parse {url} response: {response_text}")
try:
_ = int(status_first_part) # number of available slots
pause: float = 0
except ValueError: # pragma: no cover
if status_first_part == "Slot":
utc_time_str = status.split(" ")[3]
pattern = "%Y-%m-%dT%H:%M:%SZ,"
utc_time = dt.datetime.strptime(utc_time_str, pattern).replace(tzinfo=dt.timezone.utc)
utc_now = dt.datetime.now(tz=dt.timezone.utc)
seconds = int(math.ceil((utc_time - utc_now).total_seconds()))
pause = max(seconds, 1)
elif status_first_part == "Currently": # pragma: no cover
time.sleep(recursion_pause)
pause = _get_overpass_pause(base_endpoint, recursion_pause=recursion_pause)
else:
raise RequestError(f"Unrecognized server status: {status!r}")
return pause
def _overpass_request(
method: Literal["GET", "POST"],
overpass_url: str,
params: dict | None = None,
data: dict | None = None,
timeout: float | None = None,
*,
max_retries: int | None = None,
backoff_base: float | None = None,
) -> requests.Response:
if max_retries is None:
max_retries = config.overpass_max_retries
if backoff_base is None:
backoff_base = config.overpass_backoff_base
retryable_statuses = set(config.overpass_retry_statuses)
last_err_text = None
for attempt in range(max_retries + 1):
try:
pause = _get_overpass_pause(overpass_url)
except RequestError as e: # pragma: no cover
pause = 0
logger.debug(f"Overpass /status check failed: {e}")
if pause > 0:
logger.warning(f"Waiting {pause} seconds for available Overpass API slot")
time.sleep(pause)
try:
resp = _overpass_http(method, overpass_url, params=params, data=data, timeout=timeout or config.timeout)
except requests.RequestException as e: # pragma: no cover
last_err_text = str(e)
if attempt < max_retries:
sleep_s = min(60, backoff_base**attempt)
logger.warning(f"Network exception {e}, retrying in {sleep_s} (attempt {attempt + 1}/{max_retries})")
time.sleep(sleep_s)
continue
raise RequestError(f"Request error: {e}") from e
if resp.status_code == 200:
return resp
if resp.status_code == 429: # pragma: no cover
retry_after = resp.headers.get("Retry-After")
if retry_after:
try:
wait_s = int(retry_after)
except ValueError:
try:
ra_dt = parsedate_to_datetime(retry_after)
wait_s = max(1, int((ra_dt - dt.datetime.now(tz=ra_dt.tzinfo)).total_seconds()))
except Exception:
wait_s = 5
wait_s = max(wait_s, 1)
logger.debug(f"HTTP 429: honoring Retry-After={wait_s}")
time.sleep(wait_s)
else:
try:
pause = _get_overpass_pause(overpass_url)
except RequestError:
pause = 0
wait_s = max(1, pause or int(backoff_base**attempt))
logger.debug(f"HTTP 429: waiting {wait_s} seconds before retry")
time.sleep(wait_s)
if attempt < max_retries:
continue
if resp.status_code in retryable_statuses:
if attempt < max_retries:
wait_s = min(60, backoff_base**attempt)
logger.warning(f"HTTP {resp.status_code}: retrying in {wait_s}s (attempt {attempt + 1}/{max_retries})")
time.sleep(wait_s)
continue
last_err_text = resp.text
break
raise RequestError(
message=f"Request failed with status code {resp.status_code}, reason: {resp.reason}",
status_code=resp.status_code,
reason=resp.reason,
response_text=last_err_text,
response_content=resp.content,
)
def get_boundary_by_osm_id(osm_id) -> MultiPolygon | Polygon:
header = config.overpass_header
overpass_query = f"""
{header}
(relation({osm_id}););
out geom;
"""
cache_key_src = f"{config.overpass_url}\nGET\n{overpass_query}"
json_result = cache_load("boundary", cache_key_src)
if json_result is None:
logger.debug(f"Downloading territory bounds with osm_id <{osm_id}> ...")
resp = _overpass_request(
method="GET",
overpass_url=config.overpass_url,
params={"data": overpass_query},
)
json_result = resp.json()
cache_save_async("boundary", cache_key_src, json_result)
else:
logger.debug(f"Using cached territory bounds with osm_id <{osm_id}>")
geoms = []
for element in json_result["elements"]:
geometries_inners = []
geometries_outers = []
for member in element["members"]:
if "geometry" in member:
if member["role"] == "outer":
geometries_outers.append(
LineString([(coords["lon"], coords["lat"]) for coords in member["geometry"]])
)
if member["role"] == "inner":
geometries_inners.append(
LineString([(coords["lon"], coords["lat"]) for coords in member["geometry"]])
)
outer_poly = unary_union(list(polygonize(geometries_outers)))
inner_poly = unary_union(list(polygonize(geometries_inners)))
geoms.append(outer_poly.difference(inner_poly))
return unary_union(geoms)
[docs]
def get_4326_boundary(
*, osm_id: int | None = None, territory: Polygon | MultiPolygon | gpd.GeoDataFrame | gpd.GeoSeries | None = None
) -> Polygon:
"""
Normalize a territory boundary to a single EPSG:4326 Polygon.
Accepts either an `osm_id` (relation id) to fetch the boundary from OSM, a direct
`Polygon`/`MultiPolygon`, or a `GeoDataFrame`. Returns a single `Polygon` in lon/lat.
For `MultiPolygon`, the function returns its **convex hull** as a Polygon.
Parameters:
osm_id (int | None): OSM relation id. If provided, boundary is fetched via Overpass.
territory (Polygon | MultiPolygon | gpd.GeoDataFrame | gpd.GeoSeries | None): Existing boundary geometry
or a GeoDataFrame containing it. If GeoDataFrame is given, it is reprojected to 4326
and unioned (`.union_all()`).
Returns:
(shapely.Polygon): Boundary polygon in EPSG:4326.
Notes:
- Input `Polygon` is returned as-is (assumed already in EPSG:4326 by caller).
- For `MultiPolygon`, a **convex hull** is returned (may slightly expand the area and
fill gaps between parts).
- For `GeoDataFrame`, the geometry is first reprojected to 4326 and dissolved via
`.union_all()`; the result is then normalized to a `Polygon` (convex hull if needed).
Examples:
>>> get_4326_boundary(osm_id=1114252) # fetch by OSM id
>>> get_4326_boundary(territory=poly4326) # keep polygon
>>> get_4326_boundary(territory=multi_poly_4326) # convex hull of multipart
>>> get_4326_boundary(territory=territory_gdf) # GDF -> to_crs(4326) -> union_all -> Polygon
"""
if osm_id is not None:
territory = get_boundary_by_osm_id(osm_id)
if territory is None:
raise ValueError("Either osm_id or territory must be specified")
if isinstance(territory, (gpd.GeoDataFrame, gpd.GeoSeries)):
obj = territory.to_crs(4326)
territory = obj.union_all() if isinstance(obj, gpd.GeoSeries) else obj.geometry.union_all()
# Shapely geometries
if isinstance(territory, Polygon):
return territory
if isinstance(territory, MultiPolygon):
return Polygon(territory.convex_hull)
raise TypeError("territory must be one of: Polygon, MultiPolygon, GeoDataFrame, GeoSeries (or provide osm_id)")
def _poly_to_overpass(poly: Polygon) -> str:
return " ".join(f"{y} {x}" for x, y in poly.exterior.coords[:-1])
def get_routes_by_poly(polygon: Polygon, public_transport_types: list[str]) -> list[dict]:
public_transport_types = sorted(set(public_transport_types))
if not public_transport_types:
return pd.DataFrame()
has_subway = "subway" in public_transport_types
non_subway_types = [t for t in public_transport_types if t != "subway"]
has_date = config.overpass_date is not None
if has_subway and has_date: # pragma: no cover
logger.warning(
f"Overpass date is set ({config.overpass_date}); skipping subway stop area / station details "
"and querying subway as regular route relations only."
)
polygon_coords = _poly_to_overpass(polygon)
header = config.overpass_header
query_parts = [header]
simple_route_types = non_subway_types
if simple_route_types:
if len(simple_route_types) == 1:
route_filter = f'["route"="{simple_route_types[0]}"]'
else:
pattern = "|".join(re.escape(t) for t in simple_route_types)
route_filter = f'["route"~"^({pattern})$"]'
query_parts.append(f'rel(poly:"{polygon_coords}"){route_filter}->.routes_basic;')
query_parts.append(
"""
way(r.routes_basic);
out tags qt;
""".strip()
)
enable_subway_details = has_subway and not has_date
if enable_subway_details:
query_parts.append(
f"""
rel(poly:"{polygon_coords}")["route"="subway"]->.routes;
node(r.routes)->.route_nodes;
rel(bn.route_nodes)->.stop_areas;
rel(br.stop_areas)["public_transport"="stop_area_group"]["type"="public_transport"]->.groups;
nwr(r.stop_areas)["public_transport"="station"]->.stations;
""".strip()
)
if simple_route_types:
query_parts.append(".routes_basic out geom qt;")
if enable_subway_details:
query_parts.append(
"""
.stop_areas out geom qt;
.groups out body qt;
.stations out tags qt;
""".strip()
)
overpass_query = "\n".join(query_parts)
cache_key_src = f"{config.overpass_url}\nPOST\n{overpass_query}"
json_root = cache_load("routes", cache_key_src)
if json_root is None:
resp = _overpass_request(
method="POST",
overpass_url=config.overpass_url,
data={"data": overpass_query},
)
json_root = resp.json()
cache_save_async("routes", cache_key_src, json_root)
else:
logger.debug("Using cached routes_by_poly result")
json_result = json_root.get("elements", [])
return json_result
def get_network_by_filters(polygon: Polygon, way_filter: str) -> pd.DataFrame:
polygon_coords = _poly_to_overpass(polygon)
header = config.overpass_header
overpass_query = f"""
{header}
(way{way_filter}(poly:"{polygon_coords}"););
out geom;
"""
cache_key_src = f"{config.overpass_url}\nPOST\n{overpass_query}"
json_root = cache_load("network", cache_key_src)
if json_root is None:
logger.debug(f"Downloading network from OSM with filters <{way_filter}> ...")
resp = _overpass_request(
method="POST",
overpass_url=config.overpass_url,
data={"data": overpass_query},
)
json_root = resp.json()
cache_save_async("network", cache_key_src, json_root)
else:
logger.debug(f"Using cached network.")
json_result = json_root.get("elements", [])
return pd.DataFrame(json_result)
def fetch_member_tags(members_missing):
"""
members_missing: iterable of dicts {'type': 'node|way|relation', 'ref': int}
returns:
{
("node", 123): {"tags": {...}, "__latlon__": (lon,lat)},
("way", 456): {"tags": {...}, "__center__": (lon,lat), "__geometry__": [...]},
("relation",7):{"tags": {...}, "__center__": (lon,lat), "__geometry__": [...]},
}
"""
ids = defaultdict(list)
for m in members_missing:
ids[m["type"]].append(int(m["ref"]))
for k in list(ids.keys()):
ids[k] = sorted(set(ids[k]))
if not any(ids.values()):
return {}
def _build_query(all_ids: dict) -> str:
parts = []
if all_ids.get("node"):
parts.append(f'node(id:{",".join(map(str, all_ids["node"]))});')
if all_ids.get("way"):
parts.append(f'way(id:{",".join(map(str, all_ids["way"]))});')
if all_ids.get("relation"):
parts.append(f'rel(id:{",".join(map(str, all_ids["relation"]))});')
body = "\n".join(parts)
header = config.overpass_header
return f"{header}\n(\n{body}\n);\nout center qt;"
q = _build_query(ids)
cache_key_src = f"{config.overpass_url}\nPOST\n{q}"
json_root = cache_load("members", cache_key_src)
if json_root is None:
resp = _overpass_request(method="POST", overpass_url=config.overpass_url, data={"data": q})
json_root = resp.json()
cache_save_async("members", cache_key_src, json_root)
else:
logger.debug("Using cached fetch_member_tags")
result = {}
for e in json_root.get("elements", []) or []:
key = (e["type"], int(e["id"]))
payload = {"tags": e.get("tags", {}) or {}}
if e.get("type") == "node" and ("lon" in e and "lat" in e):
payload["__latlon__"] = (e["lon"], e["lat"])
if e.get("type") != "node" and "center" in e:
payload["__center__"] = (e["center"]["lon"], e["center"]["lat"])
if "geometry" in e and e["geometry"]:
payload["__geometry__"] = e["geometry"]
result[key] = payload
return result