Page Not Found¶
-Sorry, we couldn’t find that page.
-Try using the search box or go to the homepage.
-diff --git a/.readthedocs.yml b/.readthedocs.yml index 0f08810..8073dbf 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -7,7 +7,9 @@ build: jobs: post_install: # Install dependencies - - pip install sphinx furo readthedocs-sphinx-search sphinx-autobuild sphinx-design sphinx-hoverxref sphinx-inline-tabs sphinx-issues sphinx-notfound-page myst-parser sphinx-rtd-theme + - pip install -r docs/requirements.txt sphinx: - configuration: docs/source/conf.py \ No newline at end of file + configuration: docs/source/conf.py + +#sphinx furo readthedocs-sphinx-search sphinx-autobuild sphinx-design sphinx-hoverxref sphinx-inline-tabs sphinx-issues sphinx-notfound-page myst-parser sphinx-rtd-theme \ No newline at end of file diff --git a/docs/build/doctrees/404.doctree b/docs/build/doctrees/404.doctree deleted file mode 100644 index ade5e4e..0000000 Binary files a/docs/build/doctrees/404.doctree and /dev/null differ diff --git a/docs/build/doctrees/about/includes/pipeline.doctree b/docs/build/doctrees/about/includes/pipeline.doctree deleted file mode 100644 index 92acee1..0000000 Binary files a/docs/build/doctrees/about/includes/pipeline.doctree and /dev/null differ diff --git a/docs/build/doctrees/about/installation.doctree b/docs/build/doctrees/about/installation.doctree deleted file mode 100644 index afc2d43..0000000 Binary files a/docs/build/doctrees/about/installation.doctree and /dev/null differ diff --git a/docs/build/doctrees/about/introduction.doctree b/docs/build/doctrees/about/introduction.doctree deleted file mode 100644 index 63ee405..0000000 Binary files a/docs/build/doctrees/about/introduction.doctree and /dev/null differ diff --git a/docs/build/doctrees/about/pipeline.doctree b/docs/build/doctrees/about/pipeline.doctree deleted file mode 100644 index 3158263..0000000 Binary files a/docs/build/doctrees/about/pipeline.doctree and /dev/null differ diff --git a/docs/build/doctrees/environment.pickle b/docs/build/doctrees/environment.pickle deleted file mode 100644 index fa143e8..0000000 Binary files a/docs/build/doctrees/environment.pickle and /dev/null differ diff --git a/docs/build/doctrees/index.doctree b/docs/build/doctrees/index.doctree deleted file mode 100644 index d002da6..0000000 Binary files a/docs/build/doctrees/index.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/GeoDataGetter.doctree b/docs/build/doctrees/modules/GeoDataGetter.doctree deleted file mode 100644 index 07943e3..0000000 Binary files a/docs/build/doctrees/modules/GeoDataGetter.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/Geocoder_special.doctree b/docs/build/doctrees/modules/Geocoder_special.doctree deleted file mode 100644 index 0a7ce45..0000000 Binary files a/docs/build/doctrees/modules/Geocoder_special.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/OtherGeoObjects.doctree b/docs/build/doctrees/modules/OtherGeoObjects.doctree deleted file mode 100644 index 8e0a17d..0000000 Binary files a/docs/build/doctrees/modules/OtherGeoObjects.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/StreetExtractor.doctree b/docs/build/doctrees/modules/StreetExtractor.doctree deleted file mode 100644 index 4bd0a10..0000000 Binary files a/docs/build/doctrees/modules/StreetExtractor.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/Streets.doctree b/docs/build/doctrees/modules/Streets.doctree deleted file mode 100644 index 85d7be3..0000000 Binary files a/docs/build/doctrees/modules/Streets.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/VKParser.doctree b/docs/build/doctrees/modules/VKParser.doctree deleted file mode 100644 index 120600f..0000000 Binary files a/docs/build/doctrees/modules/VKParser.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/city_services_extract.doctree b/docs/build/doctrees/modules/city_services_extract.doctree deleted file mode 100644 index fb32248..0000000 Binary files a/docs/build/doctrees/modules/city_services_extract.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/data_getter.doctree b/docs/build/doctrees/modules/data_getter.doctree deleted file mode 100644 index 81c37bc..0000000 Binary files a/docs/build/doctrees/modules/data_getter.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/emotion_classifier.doctree b/docs/build/doctrees/modules/emotion_classifier.doctree deleted file mode 100644 index 6322205..0000000 Binary files a/docs/build/doctrees/modules/emotion_classifier.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/event_dynamic_prediction.doctree b/docs/build/doctrees/modules/event_dynamic_prediction.doctree deleted file mode 100644 index b2ea3a2..0000000 Binary files a/docs/build/doctrees/modules/event_dynamic_prediction.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/geocoder.doctree b/docs/build/doctrees/modules/geocoder.doctree deleted file mode 100644 index fc6558e..0000000 Binary files a/docs/build/doctrees/modules/geocoder.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/regional_activity.doctree b/docs/build/doctrees/modules/regional_activity.doctree deleted file mode 100644 index f3311f8..0000000 Binary files a/docs/build/doctrees/modules/regional_activity.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/semantic_graph.doctree b/docs/build/doctrees/modules/semantic_graph.doctree deleted file mode 100644 index b4018e5..0000000 Binary files a/docs/build/doctrees/modules/semantic_graph.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/text_classifier.doctree b/docs/build/doctrees/modules/text_classifier.doctree deleted file mode 100644 index acc4c19..0000000 Binary files a/docs/build/doctrees/modules/text_classifier.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/topic_modeler.doctree b/docs/build/doctrees/modules/topic_modeler.doctree deleted file mode 100644 index 7d28a01..0000000 Binary files a/docs/build/doctrees/modules/topic_modeler.doctree and /dev/null differ diff --git a/docs/build/doctrees/modules/visualize_graph.doctree b/docs/build/doctrees/modules/visualize_graph.doctree deleted file mode 100644 index 379b138..0000000 Binary files a/docs/build/doctrees/modules/visualize_graph.doctree and /dev/null differ diff --git a/docs/build/html/.buildinfo b/docs/build/html/.buildinfo deleted file mode 100644 index fc68774..0000000 --- a/docs/build/html/.buildinfo +++ /dev/null @@ -1,4 +0,0 @@ -# Sphinx build info version 1 -# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. -config: 34fe29b96d534ed85aad7b7243c9abd1 -tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/docs/build/html/404.html b/docs/build/html/404.html deleted file mode 100644 index 07aaf15..0000000 --- a/docs/build/html/404.html +++ /dev/null @@ -1,349 +0,0 @@ - - -
- - - - - -Sorry, we couldn’t find that page.
-Try using the search box or go to the homepage.
-
-from typing import List
-import re
-import pandas as pd
-import osmnx as ox
-from shapely.geometry import Point, Polygon, MultiPolygon
-from loguru import logger
-import pymorphy2
-from sloyka.src.utils.constants import NUM_CITY_OBJ
-from sloyka.src.geocoder.objects_address_extractor_by_rules import AddressExtractorExtra
-from sloyka.src.utils.data_getter.geo_data_getter import GeoDataGetter
-from rapidfuzz import fuzz
-import numpy as np
-
-
-
-[docs]
-class OtherGeoObjects:
-
-[docs]
- @staticmethod
- def get_and_process_osm_data(osm_id: int, tags: dict) -> pd.DataFrame:
- """
- Retrieves and processes OSM data for different urban objects.
- """
- df = GeoDataGetter.get_osm_data(osm_id, tags)
- df["geometry"] = df["geometry"].apply(OtherGeoObjects.calculate_centroid)
- df.rename(columns={df.columns[-1]: "geo_obj_tag"}, inplace=True)
- return df
-
-
-
-[docs]
- @staticmethod
- def run_osm_dfs(osm_id: int) -> pd.DataFrame:
- """
- Collects dataframes with OSM spatial data, finds centroids and combines them into one.
- """
- tags_list = [
- {"leisure": ["park", "garden", "recreation_ground"]},
- {"amenity": ["hospital", "clinic", "school", "kindergarten"]},
- {"landuse": ["cemetery"]},
- {"natural": ["beach", "water"]},
- {"railway": ["station", "subway"]},
- {"tourism": ["attraction", "museum"]},
- {"historic": ["monument", "memorial"]},
- {"place": ["square"]},
- ]
-
- osm_dfs = list()
- for tags in tags_list:
- logger.debug(f'getting {osm_id, tags}')
- try:
- tmp_df = OtherGeoObjects.get_and_process_osm_data(osm_id, tags)
- osm_dfs.append(tmp_df)
- except RuntimeError:
- logger.warning(f'Runtime error during fetching {osm_id, tags}')
- continue
- if osm_dfs:
- osm_combined_df = pd.concat(osm_dfs, axis=0)
- logger.debug(f'got {osm_id, tags}')
- logger.debug(f'{osm_combined_df.shape}')
- return osm_combined_df
- else:
- logger.warning(f'No data were gathered about city objects in {osm_id}')
- return pd.DataFrame()
-
-
-
-[docs]
- @staticmethod
- def calculate_centroid(geometry) -> Point:
- """
- Calculates the centroid for polygons.
- """
- if isinstance(geometry, (Polygon, MultiPolygon)):
- return geometry.centroid
- elif isinstance(geometry, Point):
- return geometry
- else:
- return None
-
-
-
-[docs]
- @staticmethod
- def extract_geo_obj(text) -> List[str]:
- """
- The function extracts location entities from the text, using the Natasha library.
- """
- if text is None:
- return None
- morph = pymorphy2.MorphAnalyzer()
- extractor = AddressExtractorExtra(morph)
-
- other_geo_obj = []
-
- matches = extractor(text)
- if not matches:
- return other_geo_obj
- try:
- for match in matches:
- if not match:
- continue
- part = match.fact
- if part.value and part.type:
- combined_phrase = f"{part.value} {part.type}"
- other_geo_obj.append(combined_phrase)
- elif part.value:
- other_geo_obj.append(part.value)
- elif part.type:
- other_geo_obj.append(part.type)
- if not other_geo_obj:
- return other_geo_obj
- except Exception as e:
- # logger.warning(f"Error extracting geo objects: {e}")
- return other_geo_obj
- return other_geo_obj
-
-
-
-[docs]
- @staticmethod
- def restoration_of_normal_form(other_geo_obj, osm_combined_df, threshold=0.7) -> List[str]:
- """
- This function compares the extracted location entity with an OSM array and returns a normalized form if the percentage of similarity is at least 70%.
- """
- osm_name_obj = osm_combined_df["name"].tolist()
- similarity_matrix = np.zeros((len(other_geo_obj), len(osm_name_obj)))
-
- def extract_numbers(s):
- return re.findall(r"\d+", s)
-
- for i, word1 in enumerate(other_geo_obj):
- numbers_from_extraction = extract_numbers(word1)
- for j, word2 in enumerate(osm_name_obj):
- numbers_from_OSM_name = extract_numbers(word2)
- if numbers_from_extraction == numbers_from_OSM_name:
- similarity = fuzz.ratio(word1, word2) / 100.0
- else:
- similarity = 0
- similarity_matrix[i, j] = similarity
-
- restoration_list = other_geo_obj.copy()
- for i in range(len(other_geo_obj)):
- max_index = np.argmax(similarity_matrix[i])
- if similarity_matrix[i, max_index] > threshold:
- restoration_list[i] = osm_name_obj[max_index]
- else:
- restoration_list[i] = ""
-
- return restoration_list
-
-
- @staticmethod
- def find_num_city_obj(text, NUM_CITY_OBJ) -> List[str]:
- """
- This function searches for urban objects in the text, the names of which are represented as a number. For example, "school No. 6".
- """
- text = str(text)
- text = text.lower()
- num_obj_list = []
- for key, forms in NUM_CITY_OBJ.items():
- for form in forms:
- pattern = rf"\b{re.escape(form)}\b\s+№?\s*(\d+)"
- matches = re.findall(pattern, text)
- for match in matches:
- num_obj_list.append(f"{key} № {match}")
- num_obj_list = list(set(num_obj_list))
- num_obj_list_clear = {}
- for obj in num_obj_list:
- key = obj.split(" № ")[1]
- if key in num_obj_list_clear:
- if len(obj.split(" № ")[0]) > len(num_obj_list_clear[key].split(" № ")[0]):
- num_obj_list_clear[key] = obj
- else:
- num_obj_list_clear[key] = obj
-
- return list(num_obj_list_clear.values())
-
- @staticmethod
- def combine_city_obj(df_obj) -> pd.DataFrame:
- """
- Combines the found named urban objects and urban objects whose names are in the form of numbers.
- """
- df_obj["other_geo_obj"] = df_obj["other_geo_obj"] + df_obj["other_geo_obj_num"]
- df_obj.drop(columns=["other_geo_obj_num"], inplace=True)
- return df_obj
-
- @staticmethod
- def expand_toponym(df_obj) -> pd.DataFrame:
- """
- Splits the list of found entities into different rows for further analysis.
- """
- expanded_df = df_obj.copy()
- expanded_df["other_geo_obj"] = expanded_df["other_geo_obj"].apply(
- lambda x: x if isinstance(x, list) and x else None
- )
- expanded_df = expanded_df.explode("other_geo_obj").reset_index(drop=True)
- return expanded_df
-
- @staticmethod
- def find_geometry(toponym, osm_combined_df) -> Point:
- """
- Finds the coordinate in the OSM array by the name of the city object.
- """
- if toponym is None:
- return None
- match = osm_combined_df[osm_combined_df["name"] == toponym]
- if not match.empty:
- return match.iloc[0, 1]
- else:
- return None
-
- @staticmethod
- def find_geo_obj_tag(toponym, osm_combined_df) -> str:
- """
- Finds the geo_obj_tag in the OSM array by the name of the city object.
- """
- if toponym is None:
- return None
- match = osm_combined_df[osm_combined_df["name"] == toponym]
- if not match.empty:
- return match.iloc[0, -1]
- else:
- return None
-
- @staticmethod
- def get_unique_part_types(df):
- return df["other_geo_obj"].unique()
-
-
-[docs]
- @staticmethod
- def run(osm_id: int, df: pd.DataFrame, text_column: str) -> pd.DataFrame:
- """
- Launches the module for extracting urban objects from texts that do not relate to streets.
- """
- df_obj = df.copy()
- df_obj["Numbers"] = pd.NA
- # osm_combined_df = OtherGeoObjects.run_osm_dfs(osm_id)
-
- df_obj["other_geo_obj"] = df_obj[text_column].apply(OtherGeoObjects.extract_geo_obj)
- df_obj["other_geo_obj_num"] = df_obj[text_column].apply(
- lambda x: OtherGeoObjects.find_num_city_obj(x, NUM_CITY_OBJ)
- )
-
- df_obj = OtherGeoObjects.combine_city_obj(df_obj)
-
- osm_combined_df = OtherGeoObjects.run_osm_dfs(osm_id)
-
- if not osm_combined_df.empty:
- df_obj["other_geo_obj"] = df_obj["other_geo_obj"].apply(
- lambda x: OtherGeoObjects.restoration_of_normal_form(x, osm_combined_df)
- )
- df_obj = OtherGeoObjects.expand_toponym(df_obj)
-
- df_obj["geometry"] = df_obj["other_geo_obj"].apply(lambda x: OtherGeoObjects.find_geometry(x, osm_combined_df))
- df_obj["geo_obj_tag"] = df_obj["other_geo_obj"].apply(
- lambda x: OtherGeoObjects.find_geo_obj_tag(x, osm_combined_df)
- )
- df_obj = df_obj[df_obj["geometry"].notna()]
-
- return df_obj
-
-
-
-"""
-TODO: add spellchecker since there might be misspelled words.
-
-This module is aimed to provide necessary tools to find mentioned
-location in the text.
-
-@class:Location:
-A class aimed to efficiently geocode addresses using Nominatim. Geocoded addresses are stored in the 'book' dictionary argument.
-Thus, if the address repeats, it would be taken from the book.
-
-@class:Streets:
-A class encapsulating functionality for retrieving street data
-for a specified city from OSM and processing it to extract useful information for geocoding purposes.
-
-@class:Geocoder:
-A class providing functionality for simple geocoding and address extraction.
-"""
-import numpy as np
-import re
-import warnings
-import os
-import flair
-import geopandas as gpd
-import pandas as pd
-import pymorphy2
-import torch
-import string
-import math
-from rapidfuzz import fuzz
-from nltk.stem.snowball import SnowballStemmer
-from sloyka.src.utils.data_getter.historical_geo_data_getter import HistGeoDataGetter
-from sloyka.src.utils.constants import (
- AREA_STOPWORDS,
- GROUP_STOPWORDS,
- REGEX_PATTERN,
- REPLACEMENT_STRING,
-)
-
-from flair.models import SequenceTagger
-from shapely.geometry import Point
-from tqdm import tqdm
-# from natasha import (
-# Segmenter,
-# MorphVocab,
-# NewsEmbedding,
-# NewsMorphTagger,
-# NewsSyntaxParser,
-# NewsNERTagger,
-# Doc,
-# )
-
-from loguru import logger
-
-from pandarallel import pandarallel
-from sloyka.src.geocoder.city_objects_getter import OtherGeoObjects
-from sloyka.src.utils.data_getter.street_getter import Streets
-from sloyka.src.utils.data_getter.location_getter import Location
-from sloyka.src.utils.data_getter.geo_data_getter import GeoDataGetter
-from sloyka.src.geocoder.street_extractor import StreetExtractor
-from sloyka.src.geocoder.word_form_matcher import WordFormFinder
-
-import warnings
-
-warnings.simplefilter("ignore")
-warnings.filterwarnings("ignore", category=DeprecationWarning)
-
-pandarallel.initialize(progress_bar=True, nb_workers=-1)
-
-# segmenter = Segmenter()
-# morph_vocab = MorphVocab()
-morph = pymorphy2.MorphAnalyzer()
-
-# emb = NewsEmbedding()
-# morph_tagger = NewsMorphTagger(emb)
-# syntax_parser = NewsSyntaxParser(emb)
-# ner_tagger = NewsNERTagger(emb)
-warnings.simplefilter(action="ignore", category=FutureWarning)
-warnings.filterwarnings("ignore", category=DeprecationWarning)
-
-
-
-
-stemmer = SnowballStemmer("russian")
-
-tqdm.pandas()
-
-
-
-[docs]
-class Geocoder:
- """
- This class provides a functionality of simple geocoder
- """
-
- dir_path = os.path.dirname(os.path.realpath(__file__))
-
- global_crs: int = 4326
-
- def __init__(
- self,
- model_path: str = "Geor111y/flair-ner-addresses-extractor",
- device: str = "cpu",
- osm_id: int = None,
- city_tags: dict ={"place": ["state"]}
- ):
- self.device = device
- flair.device = torch.device(device)
- self.classifier = SequenceTagger.load(model_path)
- self.osm_id = osm_id
- self.osm_city_name = (
- GeoDataGetter()
- .get_features_from_id(osm_id=self.osm_id,tags=city_tags, selected_columns=["name", "geometry"])
- .iloc[0]["name"]
- )
- self.street_names = Streets.run(self.osm_id)
-
-
- @staticmethod
- def get_stem(street_names_df: pd.DataFrame) -> pd.DataFrame:
- """
- Function finds the stem of the word to find this stem in the street
- names dictionary (df).
- """
- logger.info("get_stem started")
-
- morph = pymorphy2.MorphAnalyzer()
- cases = ["nomn", "gent", "datv", "accs", "ablt", "loct"]
-
- for case in cases:
- street_names_df[case] = street_names_df["street_name"].apply(
- lambda x: morph.parse(x)[0].inflect({case}).word if morph.parse(x)[0].inflect({case}) else None
- )
- return street_names_df
-
-
- @staticmethod
- def get_level(row: pd.Series) -> str:
- """
- Addresses in the messages are recognized on different scales:
- 1. Where we know the street name and house number -- house level;
- 2. Where we know only street name -- street level (with the centroid
- geometry of the street);
- 3. Where we don't know any info but the city -- global level.
- """
-
- if (not pd.isna(row["Street"])) and (row["Numbers"] == ""):
- return "street"
- elif (not pd.isna(row["Street"])) and (row["Numbers"] != ""):
- return "house"
- else:
- return "global"
-
-
- def create_gdf(self, df: pd.DataFrame) -> gpd.GeoDataFrame:
- """
- Function simply creates gdf from the recognised geocoded geometries.
- """
- logger.info("create_gdf started")
-
- df["Location"] = df["addr_to_geocode"].progress_apply(Location().query)
- df = df.dropna(subset=["Location"])
- df["geometry"] = df['Location'].apply(lambda x: Point(x.longitude, x.latitude))
- df["Location"] = df['Location'].apply(lambda x: x.address)
- df["Numbers"].astype(str)
- gdf = gpd.GeoDataFrame(df, geometry="geometry", crs=Geocoder.global_crs)
-
- return gdf
-
- def set_global_repr_point(self, gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
- """
- This function set the centroid (actually, representative point) of the
- geocoded addresses to those texts that weren't geocoded (or didn't
- contain any addresses according to the trained NER model).
- """
-
- try:
- gdf.loc[gdf["level"] == "global", "geometry"] = gdf.loc[
- gdf["level"] != "global", "geometry"
- ].unary_union.representative_point()
- except AttributeError:
- pass
-
- return gdf
-
- def merge_to_initial_df(self, gdf: gpd.GeoDataFrame, initial_df: pd.DataFrame) -> gpd.GeoDataFrame:
- """
- This function merges geocoded df to the initial df in order to keep
- all original attributes.
- """
-
- # initial_df.drop(columns=['key_0'], inplace=True)
- gdf = initial_df.join(
- gdf[
- [
- "Street",
- "initial_street",
- "only_full_street_name",
- "Numbers",
- "Score",
- "location_options",
- "Location",
- "geometry",
- ]
- ],
- how="outer",
- )
- gdf = gpd.GeoDataFrame(gdf, geometry="geometry", crs=Geocoder.global_crs)
-
- return
-
- def assign_street(variable):
- if isinstance(variable, float) and math.isnan(variable):
- return "street"
- return variable
-
- def get_df_areas(self, osm_id, tags):
- """
- Retrieves the GeoDataFrame of areas corresponding to the given OSM ID and tags.
-
- Args:
- osm_id (int): The OpenStreetMap ID.
- tags (dict): The tags to filter by.
- date (str): The date of the data to retrieve.
-
- Returns:
- gpd.GeoDataFrame: The GeoDataFrame containing the areas.
-
- This function first checks if the GeoDataFrame corresponding to the given OSM ID is already in the cache.
- If it is, it returns the cached GeoDataFrame. Otherwise, it retrieves the GeoDataFrame from the HistGeoDataGetter,
- filters out the 'way' elements, and adds it to the cache. Finally, it returns the GeoDataFrame from the cache.
- """
- area_cache = {}
- if osm_id not in area_cache:
- geo_data_getter = HistGeoDataGetter()
- df_areas = geo_data_getter.get_features_from_id(osm_id=osm_id, tags=tags)
- df_areas = df_areas[df_areas["element_type"] != "way"]
- area_cache[osm_id] = df_areas
- return area_cache[osm_id]
-
- def preprocess_group_name(self, group_name):
- """
- Preprocesses a group name by converting it to lowercase, removing special characters, and removing specified stopwords.
-
- Args:
- group_name (str): The group name to preprocess.
-
- Returns:
- str: The preprocessed group name.
- """
- group_name = group_name.lower()
- group_name = re.sub(REGEX_PATTERN, REPLACEMENT_STRING, group_name)
- words_to_remove = GROUP_STOPWORDS
- for word in words_to_remove:
- group_name = re.sub(word, "", group_name, flags=re.IGNORECASE)
- return group_name
-
- def preprocess_area_names(self, df_areas):
- """
- Preprocesses the area names in the given DataFrame by removing specified stopwords, converting the names to lowercase,
- and stemming them.
-
- Parameters:
- df_areas (DataFrame): The DataFrame containing the area names.
-
- Returns:
- DataFrame: The DataFrame with preprocessed area names, where the 'area_name' column contains the original names
- with stopwords removed, the 'area_name_processed' column contains the lowercase names with special characters
- removed, and the 'area_stems' column contains the stemmed names.
- """
- words_to_remove = AREA_STOPWORDS
- for word in words_to_remove:
- df_areas["area_name"] = df_areas["name"].str.replace(word, "", regex=True)
-
- df_areas["area_name_processed"] = df_areas["area_name"].str.lower()
- df_areas["area_name_processed"] = df_areas["area_name_processed"].str.replace(
- REGEX_PATTERN, REPLACEMENT_STRING, regex=True
- )
- df_areas["area_stems"] = df_areas["area_name_processed"].apply(
- lambda x: [stemmer.stem(word) for word in x.split()]
- )
- return df_areas
-
- def match_group_to_area(self, group_name, df_areas):
- """
- Matches a given group name to an area in a DataFrame of areas.
-
- Args:
- group_name (str): The name of the group to match.
- df_areas (DataFrame): The DataFrame containing the areas to match against.
-
- Returns:
- tuple: A tuple containing the best match for the group name and the admin level of the match.
- If no match is found, returns (None, None).
- """
- group_name_stems = [stemmer.stem(word) for word in group_name.split()]
- max_partial_ratio = 20
- max_token_sort_ratio = 20
- best_match = None
- admin_level = None
-
- for _, row in df_areas.iterrows():
- area_stems = row["area_stems"]
-
- partial_ratio = fuzz.partial_ratio(group_name, row["area_name_processed"])
- token_sort_ratio = fuzz.token_sort_ratio(group_name_stems, area_stems)
-
- if partial_ratio > max_partial_ratio and token_sort_ratio > max_token_sort_ratio:
- max_partial_ratio = partial_ratio
- max_token_sort_ratio = token_sort_ratio
- best_match = row["area_name"]
- admin_level = row["key"]
-
- return best_match, admin_level
-
-
-[docs]
- def run(
- self, df: pd.DataFrame, tags:dict|None=None, text_column: str = "text", group_column: str | None = "group_name", search_for_objects=False
- ):
- """
- Runs the data processing pipeline on the input DataFrame.
-
- Args:
- tags (dict): The tags to filter by.
- date (str): The date of the data to retrieve.
- df (pd.DataFrame): The input DataFrame.
- text_column (str, optional): The name of the text column in the DataFrame. Defaults to "text".
-
- Returns:
- gpd.GeoDataFrame: The processed DataFrame after running the data processing pipeline.
-
- This function retrieves the GeoDataFrame of areas corresponding to the given OSM ID and tags.
- It then preprocesses the area names and matches each group name to an area. The best match
- and admin level are assigned to the DataFrame. The function also retrieves other geographic
- objects and street names, preprocesses the street names, finds the word form, creates a GeoDataFrame,
- merges it with the other geographic objects, assigns the street tag, and returns the final GeoDataFrame.
- """
-
- # initial_df = df.copy()
- if tags:
- df_areas = self.get_df_areas(self.osm_id, tags)
- df_areas = self.preprocess_area_names(df_areas)
-
- if group_column and group_column in df.columns:
- for i, group_name in enumerate(df[group_column]):
- processed_group_name = self.preprocess_group_name(group_name)
- best_match, admin_level = self.match_group_to_area(processed_group_name, df_areas)
- df.at[i, "territory"] = best_match
- df.at[i, "key"] = admin_level
- del df_areas
- # df = AreaMatcher.run(self, df, osm_id, tags, date)
-
- df[text_column] = df[text_column].astype(str).str.replace('\n', ' ')
- df_reconstruction = df.copy()
- df[text_column] = df[text_column].apply(str)
-
-
- df = StreetExtractor.process_pipeline(df, text_column, self.classifier)
- street_names = self.get_stem(self.street_names)
-
- df = WordFormFinder(self.osm_city_name).find_word_form(df, street_names)
-
- del street_names
- gdf = self.create_gdf(df)
- del df
-
- if search_for_objects:
- df_obj = OtherGeoObjects.run(self.osm_id, df, text_column)
- gdf = pd.concat([gdf, df_obj], ignore_index=True)
- del df_obj
- gdf["geo_obj_tag"] = gdf["geo_obj_tag"].apply(Geocoder.assign_street)
-
- gdf = pd.concat(
- [gdf, df_reconstruction[~df_reconstruction[text_column].isin(gdf[text_column])]], ignore_index=True
- )
-
- # gdf2 = self.merge_to_initial_df(gdf, initial_df)
-
- # # Add a new 'level' column using the get_level function
- # gdf2["level"] = gdf2.progress_apply(self.get_level, axis=1)
- # gdf2 = self.set_global_repr_point(gdf2)
- gdf.set_crs(4326, inplace=True)
- return gdf
-
-
-
-
- # def extract_ner_street(self, text: str) -> pd.Series:
- # """
- # Function calls the pre-trained custom NER model to extract mentioned
- # addresses from the texts (usually comment) in social networks in
- # russian language.
- # The model scores 0.8 in F1 and other metrics.
- # """
-
- # try:
- # text = re.sub(r"\[.*?\]", "", text)
- # except Exception:
- # return pd.Series([None, None])
-
- # sentence = Sentence(text)
- # self.classifier.predict(sentence)
- # try:
- # res = sentence.get_labels("ner")[0].labeled_identifier.split("]: ")[1].split("/")[0].replace('"', "")
- # score = round(sentence.get_labels("ner")[0].score, 3)
- # if score > 0.7:
- # return pd.Series([res, score])
- # else:
- # return pd.Series([None, None])
-
- # except IndexError:
- # return pd.Series([None, None])
-
- # @staticmethod
- # def extract_building_num(text, street_name, number) -> string:
- # """
- # The function finds the already extracted street name in the text
- # and searches for numbers related to building numbers in a certain range of indexes
- # around the street name.
- # """
- # if pd.notna(number) and number != "":
- # return number
- # if isinstance(text, float) and math.isnan(text):
- # return ""
-
- # clear_text = str(text).translate(str.maketrans("", "", string.punctuation))
- # clear_text = clear_text.lower().split(" ")
- # positions = [index for index, item in enumerate(clear_text) if item == street_name]
-
- # if not positions:
- # return ""
-
- # position = positions[0]
- # search_start = max(0, position)
- # search_end = min(len(clear_text), position + END_INDEX_POSITION)
-
- # num_result = []
-
- # for f_index in range(max(0, search_start), min(len(clear_text), search_end)):
- # element = clear_text[f_index]
- # if any(character.isdigit() for character in str(element)) and len(str(element)) <= 3:
- # num_result.append(element)
- # break
-
- # if num_result:
- # return num_result[0]
- # else:
- # return ""
-
- # @staticmethod
- # def extract_toponym(text, street_name) -> string:
- # """
- # The function finds the already extracted street name in the text
- # and searches for words related to toponyms in a certain range of indexes
- # around the street name.
- # """
- # if isinstance(text, float) and math.isnan(text):
- # return None
-
- # clear_text = str(text).translate(str.maketrans("", "", string.punctuation))
- # clear_text = clear_text.lower().split(" ")
- # positions = [index for index, item in enumerate(clear_text) if item == street_name]
-
- # if not positions:
- # return None
-
- # position = positions[0]
- # search_start = max(0, position - int(START_INDEX_POSITION))
- # search_end = min(len(clear_text), position + int(END_INDEX_POSITION))
-
- # ad_result = []
- # for i in range(search_start, min(search_end + 1, len(clear_text))):
- # word = clear_text[i]
- # normal_form = morph.parse(word)[0].normal_form
- # if normal_form in TARGET_TOPONYMS:
- # ad_result.append(REPLACEMENT_DICT.get(normal_form, normal_form))
-
- # if ad_result:
- # return ad_result[0]
- # else:
- # return None
-
- # def get_street(self, df: pd.DataFrame, text_column: str) -> gpd.GeoDataFrame:
- # """
- # Function calls NER model and post-process result in order to extract
- # the address mentioned in the text.
- # """
- # logger.info("get_street started")
-
- # df[text_column].dropna(inplace=True)
- # df[text_column] = df[text_column].astype(str)
-
- # logger.info("extract_ner_street started")
-
- # df[["Street", "Score"]] = df[text_column].progress_apply(lambda t: self.extract_ner_street(t))
- # df["Street"] = df[[text_column, "Street"]].progress_apply(
- # lambda row: Geocoder.get_ner_address_natasha(row, text_column),
- # axis=1,
- # )
-
- # df = df[df.Street.notna()]
- # df = df[df["Street"].str.contains("[а-яА-Я]")]
-
- # logger.info("pattern1.sub started")
-
- # pattern1 = re.compile(r"(\D)(\d)(\D)")
- # df["Street"] = df["Street"].progress_apply(lambda x: pattern1.sub(r"\1 \2\3", x))
-
- # logger.info("pattern2.findall started")
-
- # pattern2 = re.compile(r"\d+")
- # df["Numbers"] = df["Street"].progress_apply(lambda x: " ".join(pattern2.findall(x)))
-
- # logger.info("pattern2.sub started")
-
- # df["Street"] = df["Street"].progress_apply(lambda x: pattern2.sub("", x).strip())
-
- # df["initial_street"] = df["Street"].copy()
-
- # df["Street"] = df["Street"].str.lower()
-
- # logger.info("extract_building_num started")
-
- # df["Numbers"] = df.progress_apply(
- # lambda row: Geocoder.extract_building_num(row[text_column], row["Street"], row["Numbers"]),
- # axis=1,
- # )
-
- # logger.info("extract_toponym started")
-
- # df["Toponims"] = df.progress_apply(
- # lambda row: Geocoder.extract_toponym(row[text_column], row["Street"]),
- # axis=1,
- # )
- # return df
-
- # df = pd.DataFrame(data={'text': 'На биржевой 14 что-то произошло'}, index=[0])
- # print(Geocoder().run(df=df, text_column='text'))
-
-
- # def find_word_form(self, df: pd.DataFrame, strts_df: pd.DataFrame) -> pd.DataFrame:
- # """
- # In Russian language any word has different forms.
- # Since addresses are extracted from the texts in social networks,
- # they might be in any possible form. This function is aimed to match that
- # free form to the one that is used in the OSM database.
-
- # Since the stem is found there would be several streets with that stem
- # in their name.
- # However, the searching street name has its specific ending (form) and
- # not each matched street name could have it.
- # """
-
- # df["full_street_name"] = None
-
- # for idx, row in df.iterrows():
- # search_val = row["Street"]
- # search_top = row["Toponims"]
- # val_num = row["Numbers"]
-
- # for col in strts_df.columns[2:]:
- # search_rows = strts_df.loc[strts_df[col] == search_val]
- # matching_rows = search_rows[search_rows["toponim_name"] == search_top]
-
- # if not matching_rows.empty:
- # only_streets_full = matching_rows["street"].values
- # streets_full = [
- # street + f" {val_num}" + f" {self.osm_city_name}" + " Россия" for street in only_streets_full
- # ]
-
- # df.loc[idx, "full_street_name"] = ",".join(streets_full)
- # df.loc[idx, "only_full_street_name"] = ",".join(only_streets_full)
-
- # else:
- # if search_val in strts_df[col].values:
- # only_streets_full = strts_df.loc[strts_df[col] == search_val, "street"].values
- # streets_full = [
- # street + f" {val_num}" + f" {self.osm_city_name}" + " Россия"
- # for street in only_streets_full
- # ]
-
- # df.loc[idx, "full_street_name"] = ",".join(streets_full)
- # df.loc[idx, "only_full_street_name"] = ",".join(only_streets_full)
-
- # df.dropna(subset=["full_street_name", "only_full_street_name"], inplace=True)
- # df["location_options"] = df["full_street_name"].str.split(",")
- # df["only_full_street_name"] = df["only_full_street_name"].str.split(",")
-
- # tmp_df_1 = df["location_options"].explode()
- # tmp_df_1.name = "addr_to_geocode"
- # tmp_df_2 = df["only_full_street_name"].explode()
- # tmp_df_2.name = "only_full_street_name"
- # new_df = tmp_df_1.to_frame().join(tmp_df_2.to_frame())
-
- # df.drop(columns=["only_full_street_name"], inplace=True)
- # df = df.merge(new_df, left_on=df.index, right_on=new_df.index)
- # df.drop(columns=["key_0"], inplace=True)
-
- # # new_df = df["only_full_street_name"].explode()
- # # new_df.name = "only_full_street_name"
- # # df.drop(columns=['key_0', 'only_full_street_name'], inplace=True)
- # # df = df.merge(new_df, left_on=df.index, right_on=new_df.index)
-
- # # print(df.head())
- # df["only_full_street_name"] = df["only_full_street_name"].astype(str)
- # df["location_options"] = df["location_options"].astype(str)
-
- # return df
-
-import pandas as pd
-import re
-from tqdm import tqdm
-from loguru import logger
-from typing import Tuple, List, Optional
-
-import string
-import math
-from typing import Optional
-from loguru import logger
-from flair.data import Sentence
-
-# Initialize morphological analyzer (use the correct library for your context)
-import pymorphy2
-morph = pymorphy2.MorphAnalyzer()
-
-from sloyka.src.geocoder.text_address_extractor_by_rules import NatashaExtractor
-from sloyka.src.utils.constants import (
- START_INDEX_POSITION,
- REPLACEMENT_DICT,
- TARGET_TOPONYMS,
- END_INDEX_POSITION,
- SCORE_THRESHOLD
-)
-
-
-
-[docs]
-class StreetExtractor:
-
- extractor = NatashaExtractor()
-
-
-[docs]
- @staticmethod
- def process_pipeline(df: pd.DataFrame, text_column: str, classifier) -> pd.DataFrame:
-
- local_df = df.copy()
- """
- Execute the address extraction pipeline on the DataFrame.
-
- Args:
- df (pd.DataFrame): DataFrame containing the text data.
- text_column (str): Column name in the DataFrame with text data for address extraction.
-
- Returns:
- pd.DataFrame: DataFrame with extracted street addresses and additional processing columns.
- """
- texts = StreetExtractor._preprocess_text_column(local_df, text_column)
- extracted_streets = StreetExtractor._extract_streets(texts, classifier)
- refined_streets = StreetExtractor._refine_street_data(extracted_streets)
- building_numbers = StreetExtractor._extract_building_numbers(texts, refined_streets)
- toponyms = StreetExtractor._extract_toponyms(texts, refined_streets)
-
- # Combine results into a DataFrame
- processed_df = pd.DataFrame({
- text_column: texts,
- 'Street': refined_streets,
- 'Numbers': building_numbers,
- 'Toponyms': toponyms
- })
-
- StreetExtractor._check_df_len_didnt_change(local_df, processed_df)
-
- return processed_df
-
-
- @staticmethod
- def _check_df_len_didnt_change(df1, df2):
- try:
- assert len(df1) == len(df2)
- except Exception as e:
- logger.critical('dfs lengths differ')
- raise e
-
-
- @staticmethod
- def _preprocess_text_column(df: pd.DataFrame, text_column: str) -> List[str]:
- """
- Preprocess the text column by ensuring non-null values and converting to string type.
-
- Args:
- df (pd.DataFrame): DataFrame containing the text data.
- text_column (str): Column name in the DataFrame with text data.
-
- Returns:
- List[str]: List of preprocessed text entries.
- """
- try:
- text_series = df[text_column].dropna().astype(str)
- return text_series.tolist()
- except Exception as e:
- logger.warning(f"Error in _preprocess_text_column: {e}")
- return []
-
- @staticmethod
- def _extract_streets(texts: List[str], classifier) -> List[Tuple[Optional[str], Optional[float]]]:
- """
- Extract street names from the text column using NER model.
-
- Args:
- texts (List[str]): List of text entries.
-
- Returns:
- List[Tuple[Optional[str], Optional[float]]]: List of tuples with extracted street names and confidence scores.
- """
- tqdm.pandas()
- extracted_streets = []
- for text in tqdm(texts):
- try:
- extracted_streets.append(StreetExtractor.extract_ner_street(text, classifier))
- except Exception as e:
- logger.warning(f"Error extracting NER street from text '{text}': {e}")
- extracted_streets.append((None, None))
- return extracted_streets
-
- @staticmethod
- def _refine_street_data(street_data: List[Tuple[Optional[str], Optional[float]]]) -> List[Optional[str]]:
- """
- Refine street data by normalizing and cleaning up street names.
-
- Args:
- street_data (List[Tuple[Optional[str], Optional[float]]]): List of tuples with extracted street names and confidence scores.
-
- Returns:
- List[Optional[str]]: List of refined street names.
- """
- refined_streets = []
- for street, _ in street_data:
- if street:
- try:
- refined_streets.append(StreetExtractor._refine_street_name(street))
- except Exception as e:
- logger.warning(f"Error refining street '{street}': {e}")
- refined_streets.append(None)
- else:
- refined_streets.append(None)
- return refined_streets
-
- @staticmethod
- def _refine_street_name(street: str) -> str:
- """
- Refine street name by normalizing and cleaning up the street string.
-
- Args:
- street (str): Raw street name.
-
- Returns:
- str: Refined street name.
- """
- try:
- street = re.sub(r"(\D)(\d)(\D)", r"\1 \2\3", street)
- street = re.sub(r"\d+", "", street).strip().lower()
- return street
- except Exception as e:
- logger.warning(f"Error in _refine_street_name with street '{street}': {e}")
- return ""
-
- @staticmethod
- def _extract_building_numbers(texts: List[str], streets: List[Optional[str]]) -> List[Optional[str]]:
- """
- Extract building numbers from the text data.
-
- Args:
- texts (List[str]): List of text entries.
- streets (List[Optional[str]]): List of refined street names.
-
- Returns:
- List[Optional[str]]: List of extracted building numbers.
- """
- building_numbers = []
- for text, street in zip(texts, streets):
- if street:
- try:
- building_numbers.append(StreetExtractor._extract_building_number(text, street))
- except Exception as e:
- logger.warning(f"Error extracting building number from text '{text}' with street '{street}': {e}")
- building_numbers.append(None)
- else:
- building_numbers.append(None)
- return building_numbers
-
- @staticmethod
- def _extract_building_number(text: str, street: str) -> str:
- """
- Extract building number from the text.
-
- Args:
- text (str): Input text for address extraction.
- street (str): Extracted and refined street name.
-
- Returns:
- str: Extracted building number.
- """
- try:
- numbers = " ".join(re.findall(r"\d+", text))
- return StreetExtractor.extract_building_num(text, street, numbers)
- except Exception as e:
- logger.warning(f"Error in _extract_building_number with text '{text}' and street '{street}': {e}")
- return ""
-
- @staticmethod
- def _extract_toponyms(texts: List[str], streets: List[Optional[str]]) -> List[Optional[str]]:
- """
- Extract toponyms from the text data.
-
- Args:
- texts (List[str]): List of text entries.
- streets (List[Optional[str]]): List of refined street names.
-
- Returns:
- List[Optional[str]]: List of extracted toponyms.
- """
- toponyms = []
- for text, street in zip(texts, streets):
- if street:
- try:
- toponyms.append(StreetExtractor.extract_toponym(text, street))
- except Exception as e:
- logger.warning(f"Error extracting toponym from text '{text}' with street '{street}': {e}")
- toponyms.append(None)
- else:
- toponyms.append(None)
- return toponyms
-
-
-
-[docs]
- @staticmethod
- def extract_toponym(text: str, street_name: str) -> Optional[str]:
- """
- Extract toponyms near the specified street name in the text.
-
- This function identifies the position of a street name in the text and searches for related toponyms
- within a specified range around the street name.
-
- Args:
- text (str): The text containing the address.
- street_name (str): The name of the street to search around.
-
- Returns:
- Optional[str]: The first toponym found if present, otherwise None.
- """
- try:
- # Handle the case where text is NaN
- if isinstance(text, float) and math.isnan(text):
- return None
-
- # Clean and split the text into words
- cleaned_text = StreetExtractor._clean_text(text)
- words = cleaned_text.split()
-
- # Find positions of the street name
- positions = StreetExtractor._find_street_name_positions(words, street_name)
- if not positions:
- return None
-
- # Search for toponyms in the range around the street name
- toponym = StreetExtractor._search_toponyms(words, positions[0])
- return toponym
-
- except Exception as e:
- logger.warning(f"Error in extract_toponym with text '{text}' and street_name '{street_name}': {e}")
- return None
-
-
- @staticmethod
- def _clean_text(text: str) -> str:
- """
- Clean the input text by removing punctuation and converting to lowercase.
-
- Args:
- text (str): The input text.
-
- Returns:
- str: The cleaned text.
- """
- return text.translate(str.maketrans("", "", string.punctuation)).lower()
-
- @staticmethod
- def _find_street_name_positions(words: List[str], street_name: str) -> List[int]:
- """
- Find positions of the street name in the list of words.
-
- Args:
- words (List[str]): List of words from the cleaned text.
- street_name (str): The name of the street to find.
-
- Returns:
- List[int]: List of positions where the street name occurs.
- """
- return [index for index, word in enumerate(words) if word == street_name]
-
- @staticmethod
- def _search_toponyms(words: List[str], position: int) -> Optional[str]:
- """
- Search for toponyms within a specified range around the given position.
-
- Args:
- words (List[str]): List of words from the cleaned text.
- position (int): The position around which to search for toponyms.
-
- Returns:
- Optional[str]: The first toponym found if present, otherwise None.
- """
- search_start = max(0, position - START_INDEX_POSITION)
- search_end = min(len(words), position + END_INDEX_POSITION)
-
- for i in range(search_start, search_end + 1):
- word = words[i]
- try:
- normal_form = morph.parse(word)[0].normal_form
- except Exception as e:
- logger.warning(f"Error parsing word '{word}': {e}")
- continue
-
- if normal_form in TARGET_TOPONYMS:
- return REPLACEMENT_DICT.get(normal_form, normal_form)
-
- return None
-
-
-[docs]
- @staticmethod
- def extract_building_num(text: str, street_name: str, number: Optional[str]) -> str:
- """
- Extract building numbers near the specified street name in the text.
-
- This function identifies the position of a street name in the text and searches for related building numbers
- within a specified range of indexes around the street name.
-
- Args:
- text (str): The text containing the address.
- street_name (str): The name of the street to search around.
- number (Optional[str]): Previously extracted building number.
-
- Returns:
- str: The first building number found if present, otherwise an empty string.
- """
- try:
- if pd.notna(number) and number != "":
- return number
-
- if isinstance(text, float) and math.isnan(text):
- return ""
-
- cleaned_text = StreetExtractor._clean_text(text)
- words = cleaned_text.split()
-
- positions = StreetExtractor._find_street_name_positions(words, street_name)
- if not positions:
- return ""
-
- building_number = StreetExtractor._search_building_number(words, positions[0])
- return building_number
-
- except Exception as e:
- logger.warning(f"Error in extract_building_num with text '{text}', street_name '{street_name}', number '{number}': {e}")
- return ""
-
-
-
- @staticmethod
- def _find_street_name_positions(words: List[str], street_name: str) -> List[int]:
- """
- Find positions of the street name in the list of words.
-
- Args:
- words (List[str]): List of words from the cleaned text.
- street_name (str): The name of the street to find.
-
- Returns:
- List[int]: List of positions where the street name occurs.
- """
- return [index for index, word in enumerate(words) if word == street_name]
-
- @staticmethod
- def _search_building_number(words: List[str], position: int) -> str:
- """
- Search for building numbers within a specified range around the given position.
-
- Args:
- words (List[str]): List of words from the cleaned text.
- position (int): The position around which to search for building numbers.
-
- Returns:
- str: The first building number found if present, otherwise an empty string.
- """
- search_start = max(0, position)
- search_end = min(len(words), position + END_INDEX_POSITION)
-
- for index in range(search_start, search_end):
- word = words[index]
- if StreetExtractor._is_building_number(word):
- return word
-
- return ""
-
- @staticmethod
- def _is_building_number(word: str) -> bool:
- """
- Check if a word is a valid building number.
-
- Args:
- word (str): The word to check.
-
- Returns:
- bool: True if the word is a valid building number, otherwise False.
- """
- return any(character.isdigit() for character in word) and len(word) <= 3
-
-#---------
-
-[docs]
- @staticmethod
- def extract_ner_street(text: str, classifier) -> pd.Series:
- """
- Extract street addresses from text using a pre-trained custom NER model.
-
- This function processes text by removing unnecessary content, applies a custom NER model
- to extract mentioned addresses, and returns the address with a confidence score.
-
- Args:
- text (str): The input text to process and extract addresses from.
-
- Returns:
- pd.Series: A Series containing the extracted address and confidence score,
- or [None, None] if extraction fails or the score is below the threshold.
- """
- try:
- cleaned_text = StreetExtractor._clean_text(text)
- sentence = Sentence(cleaned_text)
-
- # Predict entities using the classifier
- classifier.predict(sentence)
-
- address, score = StreetExtractor._extract_address_and_score(sentence)
-
- if not address or score < SCORE_THRESHOLD:
- address = StreetExtractor.extractor.get_ner_address_natasha(text)
- if address:
- score = 1
-
- # Return the result if the score is above the threshold
- return pd.Series([address, score] if score is not None and score > SCORE_THRESHOLD else [None, None])
-
- except Exception as e:
- logger.warning(f"Error in extract_ner_street with text '{text}': {e}")
- return pd.Series([None, None])
-
-
- @staticmethod
- def _clean_text(text: str) -> str:
- """
- Clean the input text by removing unwanted patterns.
-
- Args:
- text (str): The input text.
-
- Returns:
- str: The cleaned text.
- """
- try:
- return re.sub(r"\[.*?\]", "", text)
- except Exception as e:
- logger.warning(f"Error in _clean_text with text '{text}': {e}")
- return text
-
- @staticmethod
- def _extract_address_and_score(sentence: Sentence) -> Tuple[Optional[str], Optional[float]]:
- """
- Extract address and score from the NER model's predictions.
-
- Args:
- sentence (Sentence): The Sentence object containing NER predictions.
-
- Returns:
- Tuple[Optional[str], Optional[float]]: Extracted address and its confidence score.
- """
- try:
- labels = sentence.get_labels("ner")
- if labels:
- label = labels[0]
- address = StreetExtractor._parse_address(label.labeled_identifier)
- score = round(label.score, 3)
- return address, score
- return None, None
- except IndexError as e:
- logger.warning(f"Error in _extract_address_and_score: {e}")
- return None, None
-
- @staticmethod
- def _parse_address(label_value: str) -> str:
- """
- Parse the address from the label value string.
-
- Args:
- label_value (str): The labeled identifier from the NER model.
-
- Returns:
- str: Parsed address.
- """
- try:
- return label_value.split("]: ")[1].split("/")[0].replace('"', "")
- except IndexError as e:
- logger.warning(f"Error in _parse_address with label value '{label_value}': {e}")
- return ""
-
-
-
-
-"""
-This module contains the EmotionClassifiers class, which is designed to categorise input texts into emotion categories.
-It uses a Huggingface transformer model trained on Bert_Large by default.
-The EmotionClassifiers class has the following method:
-@method:recognize_emotion: Adding an emotion category.
-@method:recognize_average_emotion_from_multiple_models: Adding an average emotion category or the most likely emotion
-category using multiple models.
-"""
-
-from aniemore.recognizers.text import TextRecognizer
-from aniemore.models import HuggingFaceModel
-import torch
-import pandas as pd
-from tqdm import tqdm
-import gc
-
-
-
-[docs]
-class EmotionRecognizer:
- """
- This class is designed to categorise input texts into emotion categories.
-
- Attributes:
-
- - model: This attribute holds the model used for emotion recognition. It defaults to HuggingFaceModel.Text.Bert_Large,
- but can be set to any other compatible model during the instantiation of the class.
-
- - device: the device to use for inference. It automatically selects 'cuda' (GPU) if a compatible GPU
- is available and CUDA is enabled, otherwise, it falls back to 'cpu'.
-
- - text: The text to be analyzed.
-
- - df: The DataFrame containing the text to be analyzed.
-
- - text_column: The name of the column containing the text to be analyzed.
- """
-
- def __init__(self, model_name=HuggingFaceModel.Text.Bert_Large, device=None):
- self.device = device if device is not None else ("cuda" if torch.cuda.is_available() else "cpu")
- self.model_name = model_name
-
- # Define the default model names to avoid repeated initialization
- self.default_model_names = [
- HuggingFaceModel.Text.Bert_Tiny,
- HuggingFaceModel.Text.Bert_Base,
- HuggingFaceModel.Text.Bert_Large,
- HuggingFaceModel.Text.Bert_Tiny2,
- ]
-
-
-[docs]
- def recognize_emotion(self, text):
- """
- Return the emotion for a given text.
- """
- recognizer = TextRecognizer(model=self.model_name, device=self.device)
- emotion = recognizer.recognize(text, return_single_label=True)
- return emotion
-
-
-
-[docs]
- def recognize_average_emotion_from_multiple_models(self, df, text_column, models=None, average=True):
- """
- Calculate the prevailing emotion using multiple models for a DataFrame column.
- """
- if models is None:
- models = self.default_model_names
- else:
- # Validate that the provided models are in the default models list
- for model in models:
- if model not in self.default_model_names:
- raise ValueError(
- f"Model {model} is not a valid model. Valid models are: {self.default_model_names}"
- )
-
- # Initialize scores DataFrame
- scores = pd.DataFrame(
- 0, index=df.index, columns=["happiness", "sadness", "anger", "fear", "disgust", "enthusiasm", "neutral"]
- )
-
- # Process each model one by one with progress bar
- for model_name in tqdm(models, desc="Processing models"):
- try:
- print(f"Clearing cache and collecting garbage before loading model: {model_name}")
- torch.cuda.empty_cache()
- gc.collect()
-
- print(f"Loading model: {model_name}")
- recognizer = TextRecognizer(model=model_name, device=self.device)
- model_results = [recognizer.recognize(text, return_single_label=False) for text in df[text_column]]
-
- for idx, result in enumerate(model_results):
- for emotion, score in result.items():
- if average:
- scores.at[df.index[idx], emotion] += score
- else:
- scores.at[df.index[idx], emotion] = max(scores.at[df.index[idx], emotion], score)
-
- # Удаление модели из памяти
- del recognizer
- torch.cuda.empty_cache() # Очистка кеша CUDA (если используется GPU)
- gc.collect() # Сборка мусора
- print(f"Model {model_name} processed and unloaded.")
- except Exception as e:
- print(f"Error processing model {model_name}: {e}")
- torch.cuda.empty_cache()
- gc.collect()
-
- if average:
- # Average the scores by the number of models
- scores = scores.div(len(models))
-
- # Determine the prevailing emotion with the highest score
- prevailing_emotions = scores.idxmax(axis=1)
-
- return prevailing_emotions
-
-
-
-"""
-This module contains the EventDetection class, which is aimed to generate events and their connections based
-on the application of semantic clustering method (BERTopic) on the texts in the context of an urban spatial model.
-
-The EventDetection class has the following methods:
-
-@method:_get_roads:
-Get the road network of a city as road links and roads.
-
-@method:_get_buildings:
-Get the buildings of a city as a GeoDataFrame.
-
-@method:_collect_population:
-Collect population data for each object (building, street, link).
-
-@method:_preprocess:
-Preprocess the data.
-"""
-import re
-from itertools import chain, combinations
-
-import geopandas as gpd
-import osmnx as ox
-import pandas as pd
-import numpy as np
-from bertopic import BERTopic
-from hdbscan import HDBSCAN
-from shapely.geometry import LineString
-from transformers.pipelines import pipeline
-from umap import UMAP
-
-
-
-[docs]
-class EventDetection:
- """
- This class is aimed to generate events and their connections.
- It is based on the application of semantic clustering method (BERTopic)
- on the texts in the context of urban spatial model
- """
-
- def __init__(self):
- np.random.seed(42)
- self.population_filepath = None
- self.levels = ["building", "link", "road", "global"]
- self.levels_scale = dict(zip(self.levels, list(range(2, 10, 2))))
- self.functions_weights = {
- "Безопасность": 0.12,
- "Благоустройство": 0.21,
- "Дороги": 0.18,
- "ЖКХ": 0.2,
- "Здравоохранение": 0.2,
- "Другое": 0.16,
- "Образование": 0.16,
- "Социальная защита": 0.13,
- "Строительство": 0.19,
- "Обращение с отходами": 0.19,
- "Транспорт": 0.17,
- "Экология": 0.22,
- "Энергетика": 0.19,
- }
- self.messages = None
- self.links = None
- self.buildings = None
- self.population = None
- self.topic_model = None
- self.events = None
- self.connections = None
-
- def _get_roads(self, city_name, city_crs) -> gpd.GeoDataFrame:
- """
- Get the road network of a city as road links and roads.
- Args:
- city_name (string): The name of the city.
- city_crs (int): The spatial reference code (CRS) of the city.
- Returns:
- links (GeoDataFrame): GeoDataFrame with the city's road links and roads.
- """
- links = ox.graph_from_place(city_name, network_type="drive")
- links = ox.utils_graph.graph_to_gdfs(links, nodes=False).to_crs(city_crs)
- links = links.reset_index(drop=True)
- links["link_id"] = links.index
- links["geometry"] = links["geometry"].buffer(7)
- links = links.to_crs(4326)
- links = links[["link_id", "name", "geometry"]]
- links.loc[links["name"].map(type) == list, "name"] = links[links["name"].map(type) == list]["name"].map(
- lambda x: ", ".join(x)
- )
- road_id_name = dict(enumerate(links.name.dropna().unique().tolist()))
- road_name_id = {v: k for k, v in road_id_name.items()}
- links["road_id"] = links["name"].replace(road_name_id)
- return links
-
- def _get_buildings(self) -> gpd.GeoDataFrame:
- """
- Get the buildings of a city as a GeoDataFrame
- Args:
- links(GeoDataFrame): GeoDataFrame with the city's road links and roads.
- filepath (string): The path to the GeoJSON file with building data. The default is set to 'population.geojson'.
- Returns:
- buildings (GeoDataFrame): GeoDataFrame with the city's buildings.
- """
- buildings = gpd.read_file(self.population_filepath)
- buildings = buildings[["address", "building_id", "population_balanced", "geometry"]]
- buildings = buildings.to_crs(4326)
- buildings["building_id"] = buildings.index
- buildings = (
- gpd.sjoin_nearest(
- buildings,
- self.links[["link_id", "road_id", "geometry"]],
- how="left",
- max_distance=500,
- )
- .drop(columns=["index_right"])
- .drop_duplicates(subset="building_id")
- )
- self.buildings = buildings
- return buildings
-
- def _collect_population(self) -> dict:
- """
- Collect population data for each object (building, street, link).
- """
- buildings = self.buildings.copy()
- pops_global = {0: buildings.population_balanced.sum()}
- pops_buildings = buildings["population_balanced"].to_dict()
- pops_links = (
- buildings[["population_balanced", "link_id"]].groupby("link_id").sum()["population_balanced"].to_dict()
- )
- pops_roads = (
- buildings[["population_balanced", "road_id"]].groupby("road_id").sum()["population_balanced"].to_dict()
- )
- pops = {
- "global": pops_global,
- "road": pops_roads,
- "link": pops_links,
- "building": pops_buildings,
- }
- self.population = pops
- return pops
-
- def _preprocess(self) -> gpd.GeoDataFrame:
- """
- Preprocess the data
- """
- messages = self.messages[
- [
- "Текст комментария",
- "geometry",
- "Дата и время",
- "message_id",
- "cats",
- ]
- ]
- messages = messages.sjoin(self.buildings, how="left")[
- [
- "Текст комментария",
- "address",
- "geometry",
- "building_id",
- "message_id",
- "Дата и время",
- "cats",
- ]
- ]
- messages.rename(
- columns={"Текст комментария": "text", "Дата и время": "date_time"},
- inplace=True,
- )
- messages = messages.sjoin(self.links, how="left")[
- [
- "text",
- "geometry",
- "building_id",
- "index_right",
- "name",
- "message_id",
- "date_time",
- "cats",
- "road_id",
- ]
- ]
- messages.rename(
- columns={"index_right": "link_id", "name": "road_name"},
- inplace=True,
- )
- messages = messages.join(
- self.buildings[["link_id", "road_id"]],
- on="building_id",
- rsuffix="_from_building",
- )
- messages.loc[messages.link_id.isna(), "link_id"] = messages.loc[messages.link_id.isna()][
- "link_id_from_building"
- ]
- messages.loc[messages.road_id.isna(), "road_id"] = messages.loc[messages.road_id.isna()][
- "road_id_from_building"
- ]
- messages = messages[
- [
- "message_id",
- "text",
- "geometry",
- "building_id",
- "link_id",
- "road_id",
- "date_time",
- "cats",
- ]
- ].dropna(subset="text")
- messages["cats"] = messages.cats.astype(str).str.split("; ").map(lambda x: x[0])
- messages["importance"] = messages["cats"].map(self.functions_weights)
- messages["importance"].fillna(0.16, inplace=True)
- messages["global_id"] = 0
- return messages
-
- def _create_model(self, min_event_size):
- """
- Create a topic model with a UMAP, HDBSCAN, and a BERTopic model.
- """
- umap_model = UMAP(
- n_neighbors=15,
- n_components=5,
- min_dist=0.0,
- metric="cosine",
- random_state=42,
- )
- hdbscan_model = HDBSCAN(
- min_cluster_size=min_event_size,
- min_samples=1,
- metric="euclidean",
- cluster_selection_method="eom",
- prediction_data=True,
- )
- embedding_model = pipeline("feature-extraction", model="cointegrated/rubert-tiny2")
- topic_model = BERTopic(
- embedding_model=embedding_model,
- hdbscan_model=hdbscan_model,
- umap_model=umap_model,
- calculate_probabilities=True,
- verbose=True,
- n_gram_range=(1, 3),
- )
- return topic_model
-
- def _event_from_object(
- self,
- messages,
- topic_model,
- target_column: str,
- population: dict,
- object_id: float,
- event_level: str,
- ):
- """
- Create a list of events for a given object
- (building, street, link, total).
- """
- local_messages = messages[messages[target_column] == object_id]
- message_ids = local_messages.message_id.tolist()
- docs = local_messages.text.tolist()
- if len(docs) >= 5:
- try:
- topics, probs = topic_model.fit_transform(docs)
- except TypeError:
- print("Can't reduce dimensionality or some other problem")
- return
- try:
- topics = topic_model.reduce_outliers(docs, topics)
- topic_model.update_topics(docs, topics=topics)
- except ValueError:
- print("Can't distribute all messages in topics")
- event_model = topic_model.get_topic_info()
- event_model["level"] = event_level
- event_model["object_id"] = str(object_id)
- event_model["id"] = event_model.apply(
- lambda x: f"{str(x.Topic)}_{str(x.level)}_{str(x.object_id)}",
- axis=1,
- )
- try:
- event_model["potential_population"] = population[event_level][object_id]
- except Exception: # need to select type of error
- event_model["potential_population"] = population["global"][0]
-
- clustered_messages = pd.DataFrame(data={"id": message_ids, "text": docs, "topic_id": topics})
- event_model["message_ids"] = [
- clustered_messages[clustered_messages["topic_id"] == topic]["id"].tolist()
- for topic in event_model.Topic
- ]
- event_model["duration"] = event_model.message_ids.map(
- lambda x: (
- pd.to_datetime(messages[messages["message_id"].isin(x)].date_time).max()
- - pd.to_datetime(messages[messages["message_id"].isin(x)].date_time).min()
- ).days
- )
- event_model["category"] = event_model.message_ids.map(
- lambda x: ", ".join(messages[messages["message_id"].isin(x)].cats.mode().tolist())
- )
- event_model["importance"] = event_model.message_ids.map(
- lambda x: messages[messages["message_id"].isin(x)].importance.mean()
- )
- return event_model
- else:
- return
-
- def _get_events(self, min_event_size) -> gpd.GeoDataFrame:
- """
- Create a list of events for all levels.
- """
- messages = self.messages.copy()
- messages_list = messages.text.tolist()
- index_list = messages.message_id.tolist()
- pops = self._collect_population()
- topic_model = self._create_model(min_event_size)
- events = [
- [
- self._event_from_object(messages, topic_model, f"{level}_id", pops, oid, level)
- for oid in messages[f"{level}_id"].unique().tolist()
- ]
- for level in reversed(self.levels)
- ]
- events = [item for sublist in events for item in sublist if item is not None]
- events = pd.concat(list(chain(events)))
- events["geometry"] = events.message_ids.map(
- lambda x: messages[messages.message_id.isin(x)].geometry.unary_union.representative_point()
- )
- events = gpd.GeoDataFrame(events, geometry="geometry").set_crs(4326)
- events.rename(
- columns={
- "Name": "name",
- "Representative_Docs": "docs",
- "Count": "intensity",
- "potential_population": "population",
- },
- inplace=True,
- )
- events["docs"] = events["docs"].map(
- lambda x: ", ".join([str(index_list[messages_list.index(text)]) for text in x])
- )
- events.message_ids = events.message_ids.map(lambda x: ", ".join([str(id) for id in x]))
- events["intensity"] = (events["intensity"] - events["intensity"].min()) / (
- events["intensity"].max() - events["intensity"].min()
- )
- events["duration"] = (events["duration"] - events["duration"].min()) / (
- events["duration"].max() - events["duration"].min()
- )
- events.loc[events.intensity == 0, "intensity"] = 0.1 # fix later
- events.loc[events.duration.isna(), "duration"] = 1 # fix later
- events["risk"] = events.intensity * events.duration * events.importance * events.population
- events["message_ids"] = events.message_ids.map(lambda x: ", ".join(list(set(x.split(", ")))))
- events["docs"] = events.docs.map(lambda x: ", ".join(list(set(x.split(", ")))))
- return events
-
- def _get_event_connections(self) -> gpd.GeoDataFrame:
- """
- Create a list of connections between events.
- """
- events = self.events.copy()
- events.index = events.id
- events.geometry = events.centroid
- weights = [len((set(c[0]) & set(c[1]))) for c in combinations(self.events.message_ids, 2)]
- nodes = [c for c in combinations(events.id, 2)]
- connections = pd.DataFrame(nodes, weights).reset_index()
- connections.columns = ["weight", "a", "b"]
- connections = connections[connections["weight"] > 0]
- connections = connections.join(events.geometry, on="a", rsuffix="_")
- connections = connections.join(events.geometry, on="b", rsuffix="_")
- events.reset_index(drop=True, inplace=True)
- connections["geometry"] = connections.apply(lambda x: LineString([x["geometry"], x["geometry_"]]), axis=1)
- connections.drop(columns=["geometry_"], inplace=True)
- connections = gpd.GeoDataFrame(connections, geometry="geometry").set_crs(32636)
- return connections
-
- def _rebalance(self, connections, events, levels, event_population: int, event_id: str):
- """
- Rebalance the population of an event.
- """
- connections_of_event = connections[connections.a == event_id].b
- if len(connections_of_event) > 0:
- accounted_pops = events[events.id.isin(connections_of_event) & events.level.isin(levels)].population.sum()
- if event_population >= accounted_pops:
- rebalanced_pops = event_population - accounted_pops
- else:
- connections_of_event = connections[connections.b == event_id].a
- accounted_pops = events[
- events.id.isin(connections_of_event) & events.level.isin(levels)
- ].population.sum()
- rebalanced_pops = event_population - accounted_pops
- return rebalanced_pops
- else:
- return event_population
-
- def _rebalance_events(self) -> gpd.GeoDataFrame:
- """
- Rebalance the population of events.
- """
- levels = self.levels.copy()
- events = self.events.copy()
- connections = self.connections.copy()
- events_rebalanced = []
- for level in levels[1:]:
- levels_to_account = levels[: levels.index(level)]
- events_for_level = events[events.level == level]
- events_for_level["rebalanced_population"] = events_for_level.apply(
- lambda x: self._rebalance(
- connections,
- events,
- levels_to_account,
- x.population,
- x.id,
- ),
- axis=1,
- )
- events_rebalanced.append(events_for_level)
- events_rebalanced = pd.concat(events_rebalanced)
- events_rebalanced.loc[
- events_rebalanced.rebalanced_population.isna(),
- "rebalanced_population",
- ] = 0
- events_rebalanced["population"] = events_rebalanced.rebalanced_population
- events_rebalanced.drop(columns=["rebalanced_population"], inplace=True)
- events_rebalanced.population = events_rebalanced.population.astype(int)
- events_rebalanced["population"] = (events_rebalanced["population"] - events_rebalanced["population"].min()) / (
- events_rebalanced["population"].max() - events_rebalanced["population"].min()
- )
- events_rebalanced.loc[events_rebalanced.population == 0, "population"] = 0.01 # fix later
- events_rebalanced.loc[
- events_rebalanced.population.isna() & events_rebalanced.level.isin(["building", "link"]),
- "population",
- ] = 0.01 # fix later
- events_rebalanced.loc[
- events_rebalanced.population.isna() & events_rebalanced.level.isin(["road", "global"]),
- "population",
- ] = 1 # fix later
- events_rebalanced["risk"] = (
- events_rebalanced.intensity * (events_rebalanced.duration + 1) * events_rebalanced.importance
- )
- events_rebalanced = events_rebalanced[["name", "docs", "level", "id", "risk", "message_ids", "geometry"]]
- return events_rebalanced
-
- def _filter_outliers(self):
- """
- Filter outliers.
- """
- pattern = r"^-1.*"
- events = self.events
- connections = self.connections
- print(
- len(events[events.name.map(lambda x: True if re.match(pattern, x) else False)]),
- "outlier clusters of",
- len(events),
- "total clusters. Filtering...",
- )
- events = events[events.name.map(lambda x: False if re.match(pattern, x) else True)]
- connections = connections[connections.a.map(lambda x: False if re.match(pattern, x) else True)]
- connections = connections[connections.b.map(lambda x: False if re.match(pattern, x) else True)]
- return events, connections
-
- def _prepare_messages(self):
- """
- Prepare messages for export.
- """
- messages = self.messages.copy()
- messages = messages.reset_index(drop=True)
- messages.rename(columns={"cats": "block"}, inplace=True)
- messages = messages[["message_id", "text", "geometry", "date_time", "block"]]
- messages = messages.to_crs(4326)
- return messages
-
-
-[docs]
- def run(
- self,
- target_texts: gpd.GeoDataFrame,
- filepath_to_population: str,
- city_name: str,
- city_crs: int,
- min_event_size: int,
- ):
- """
- Returns a GeoDataFrame of events, a GeoDataFrame of
- connections between events, and a GeoDataFrame of messages.
- """
- self.population_filepath = filepath_to_population
- self.messages = target_texts.copy()
- print("messages loaded")
- self.links = self._get_roads(city_name, city_crs)
- print("road links loaded")
- self.buildings = self._get_buildings()
- print("buildings loaded")
- self.messages = self._preprocess()
- print("messages preprocessed")
- self.events = self._get_events(min_event_size)
- print("events detected")
- self.connections = self._get_event_connections()
- print("connections generated")
- self.events = self._rebalance_events()
- print("population and risk rebalanced")
- self.events, self.connections = self._filter_outliers()
- print("outliers filtered")
- self.messages = self._prepare_messages()
- print("done!")
-
- return self.messages, self.events, self.connections
-
-
-
-"""
-This module contains the TextClassifiers class, which is aimed to classify input texts into themes or structured types of events.
-It uses a Huggingface transformer model trained on rubert-tiny.
-In many cases, the count of messages per theme was too low to efficiently train, so synthetic themes based
-on the categories as the upper level were used (for example, 'unknown_ЖКХ').
-
-Attributes:
-- repository_id (str): The repository ID.
-- number_of_categories (int): The number of categories.
-- device_type (str): The type of device.
-
-The TextClassifiers class has the following methods:
-
-@method:initialize_classifier: Initializes the text classification pipeline with the specified model, tokenizer, and device type.
-
-@method:run_text_classifier_topics:
- Takes a text as input and returns the predicted themes and probabilities.
-
-@method:run_text_classifier:
- Takes a text as input and returns the predicted categories and probabilities.
-"""
-import pandas as pd
-from transformers import pipeline
-from sloyka.src.utils.exceptions import InvalidInputError, ClassifierInitializationError, ClassificationError
-
-
-
-[docs]
-class TextClassifiers:
- def __init__(self, repository_id, number_of_categories=1, device_type=None):
- self.repository_id = repository_id
- self.number_of_categories = number_of_categories
- self.device_type = device_type or -1 # -1 will automatically choose the device based on availability
- self.classifier = None
-
-
-[docs]
- def initialize_classifier(self):
- if not self.classifier:
- try:
- self.classifier = pipeline(
- "text-classification",
- model=self.repository_id,
- tokenizer="cointegrated/rubert-tiny2",
- device=self.device_type,
- )
- except Exception as e:
- raise ClassifierInitializationError(f"Failed to initialize the classifier: {e}")
-
-
-
-[docs]
- def classify_text(self, text, is_topic=False):
- if not isinstance(text, str):
- raise InvalidInputError("Input must be a string.")
-
- self.initialize_classifier()
-
- try:
- predictions = self.classifier(text, top_k=self.number_of_categories)
- preds_df = pd.DataFrame(predictions)
- categories = "; ".join(preds_df["label"].tolist())
- probabilities = "; ".join(preds_df["score"].round(3).astype(str).tolist())
- except Exception as e:
- raise ClassificationError(f"Error during text classification: {e}")
-
- return categories, probabilities
-
-
-
-[docs]
- def run_text_classifier_topics(self, text):
- return self.classify_text(text, is_topic=True)
-
-
-
-
-
-
-"""
-@class:Semgraph:
-The main class of the semantic graph module. It is aimed to build a semantic graph based on the provided data
-and parameters.
-More convenient to use after extracting data from geocoder.
-
-The Semgraph class has the following methods:
-
-@method:clean_from_dublicates:
-A function to clean a DataFrame from duplicates based on specified columns.
-
-@method:clean_from_digits:
-Removes digits from the text in the specified column of the input DataFrame.
-
-@method:clean_from_toponyms:
-Clean the text in the specified text column by removing any words that match the toponyms in the name and
-toponym columns.
-
-@method:aggregate_data:
-Creates a new DataFrame by aggregating the data based on the provided text and toponyms columns.
-"""
-
-import nltk
-import pandas as pd
-import geopandas as gpd
-import networkx as nx
-from transformers import BertTokenizer, BertModel # type: ignore
-
-from .g_attrs_adder import add_attributes
-from .keyword_extracter import extract_keywords
-from .semantic_closeness_annotator import get_semantic_closeness
-from .g_text_data_getter import get_tag, get_coordinates, get_text_ids
-from ..utils.data_preprocessing.preprocessor import (
- clean_from_dublicates,
- clean_from_digits,
- clean_from_toponyms,
- clean_from_links,
-)
-
-nltk.download("stopwords")
-
-
-from sloyka.src.utils.constants import TAG_ROUTER
-
-
-
-[docs]
-class Semgraph:
- """
- This is the main class of semantic graph module.
- It is aimed to build a semantic graph based on the provided data and parameters.
- More convinient to use after extracting data from geocoder.
-
- Param:
- bert_name: the name of the BERT model to use (default is 'DeepPavlov/rubert-base-cased')
- language: the language of the BERT model (default is 'russian')
- device: the device to use for inference (default is 'cpu')
- """
-
- def __init__(
- self, bert_name: str = "DeepPavlov/rubert-base-cased", language: str = "russian", device: str = "cpu"
- ) -> None:
- self.language = language
- self.device = device
- self.tokenizer = BertTokenizer.from_pretrained(bert_name)
- self.model_name = bert_name
- self.model = BertModel.from_pretrained(bert_name).to(device)
-
-
-[docs]
- @staticmethod
- def convert_df_to_edge_df(
- data: pd.DataFrame | gpd.GeoDataFrame, toponym_column: str, word_info_column: str = "words_score"
- ) -> pd.DataFrame | gpd.GeoDataFrame:
- edge_list = []
-
- for i in data[toponym_column]:
- current_df = data.loc[data[toponym_column] == i]
- for j in range(len(current_df)):
- toponym = current_df[toponym_column].iloc[j]
- word_nodes = current_df[word_info_column].iloc[j]
-
- for k in word_nodes:
- if k[2] in TAG_ROUTER.keys():
- edge_list.append([toponym, k[0], k[1], TAG_ROUTER[k[2]]])
-
- edge_df = pd.DataFrame(edge_list, columns=["FROM", "TO", "distance", "type"])
-
- return edge_df
-
-
-
-[docs]
- def build_graph(
- self,
- data: pd.DataFrame | gpd.GeoDataFrame,
- id_column: str,
- text_column: str,
- text_type_column: str,
- toponym_column: str,
- toponym_name_column: str,
- toponym_type_column: str,
- post_id_column: str,
- parents_stack_column: str,
- directed: bool = True,
- location_column: str | None = None,
- geometry_column: str | None = None,
- key_score_filter: float = 0.6,
- semantic_score_filter: float = 0.75,
- top_n: int = 1,
- ) -> nx.classes.graph.Graph:
- """
- Build a graph based on the provided data.
-
- Args:
- data (pd.DataFrame or gpd.GeoDataFrame): The input data to build the graph from.
- id_column (str): The column containing unique identifiers.
- text_column (str): The column containing text information.
- text_type_column (str): The column indicating the type of text.
- toponym_column (str): The column containing toponym information.
- toponym_name_column (str): The column containing toponym names.
- toponym_type_column (str): The column containing toponym types.
- post_id_column (str): The column containing post identifiers.
- parents_stack_column (str): The column containing parent-child relationships.
- directed (bool): Flag indicating if the graph is directed. Defaults to True.
- location_column (str or None): The column containing location information. Defaults to None.
- geometry_column (str or None): The column containing geometry information. Defaults to None.
- key_score_filter (float): The threshold for key score filtering. Defaults to 0.6.
- semantic_score_filter (float): The threshold for semantic score filtering. Defaults to 0.75.
- top_n (int): The number of top keywords to extract. Defaults to 1.
-
- Returns:
- nx.classes.graph.Graph: The constructed graph.
- """
-
- data = clean_from_dublicates(data, id_column)
-
- data = clean_from_digits(data, text_column)
-
- data = clean_from_toponyms(data, text_column, toponym_name_column, toponym_type_column)
-
- data = clean_from_links(data, text_column)
-
- extracted = extract_keywords(
- data,
- text_column,
- text_type_column,
- toponym_column,
- id_column,
- post_id_column,
- parents_stack_column,
- key_score_filter,
- top_n,
- )
-
- df = extracted[0]
- toponyms_attributes = extracted[1]
- words_attributes = extracted[2]
-
- preprocessed_df = self.convert_df_to_edge_df(data=df, toponym_column=toponym_column)
-
- words_df = get_semantic_closeness(preprocessed_df, "TO", semantic_score_filter)
-
- graph_df = pd.concat([preprocessed_df, words_df], ignore_index=True)
- if directed:
- G = nx.from_pandas_edgelist(
- graph_df, source="FROM", target="TO", edge_attr=["distance", "type"], create_using=nx.DiGraph()
- )
-
- else:
- G = nx.from_pandas_edgelist(graph_df, source="FROM", target="TO", edge_attr=["distance", "type"])
-
- nodes = list(G.nodes())
- attributes = get_tag(nodes, list(set(data[toponym_column])))
-
- nx.set_node_attributes(G, attributes, "tag")
- G = add_attributes(G=G, new_attributes=toponyms_attributes, attribute_tag="counts", toponym_attributes=True)
-
- G = add_attributes(G=G, new_attributes=words_attributes, attribute_tag="counts", toponym_attributes=False)
-
- if isinstance(data, gpd.GeoDataFrame):
- G = get_coordinates(
- G=G,
- geocoded_data=data,
- toponym_column=toponym_column,
- location_column=location_column,
- geometry_column=geometry_column,
- )
-
- G = get_text_ids(G=G, filtered_data=df, toponym_column=toponym_column, text_id_column=id_column)
-
- return G
-
-
-
-[docs]
- def update_graph(
- self,
- G: nx.classes.graph.Graph,
- data: pd.DataFrame | gpd.GeoDataFrame,
- id_column: str,
- text_column: str,
- text_type_column: str,
- toponym_column: str,
- toponym_name_column: str,
- toponym_type_column: str,
- post_id_column: str,
- parents_stack_column: str,
- directed: bool = True,
- counts_attribute: str | None = None,
- location_column: str | None = None,
- geometry_column: str | None = None,
- key_score_filter: float = 0.6,
- semantic_score_filter: float = 0.75,
- top_n: int = 1,
- ) -> nx.classes.graph.Graph:
- """
- Update the input graph based on the provided data, returning the updated graph.
-
- Args:
- G (nx.classes.graph.Graph): The input graph to be updated.
- data (pd.DataFrame or gpd.GeoDataFrame): The input data to update the graph.
- id_column (str): The column containing unique identifiers.
- text_column (str): The column containing text information.
- text_type_column (str): The column indicating the type of text.
- toponym_column (str): The column containing toponym information.
- toponym_name_column (str): The column containing toponym names.
- toponym_type_column (str): The column containing toponym types.
- post_id_column (str): The column containing post identifiers.
- parents_stack_column (str): The column containing parent-child relationships.
- directed (bool): Flag indicating if the graph is directed. Defaults to True.
- counts_attribute (str or None): The attribute to be used for counting. Defaults to None.
- location_column (str or None): The column containing location information. Defaults to None.
- geometry_column (str or None): The column containing geometry information. Defaults to None.
- key_score_filter (float): The threshold for key score filtering. Defaults to 0.6.
- semantic_score_filter (float): The threshold for semantic score filtering. Defaults to 0.75.
- top_n (int): The number of top keywords to extract. Defaults to 1.
-
- Returns:
- nx.classes.graph.Graph: The updated graph.
- """
-
- new_G = self.build_graph(
- data,
- id_column,
- text_column,
- text_type_column,
- toponym_column,
- toponym_name_column,
- toponym_type_column,
- post_id_column,
- parents_stack_column,
- directed,
- location_column,
- geometry_column,
- key_score_filter,
- semantic_score_filter,
- top_n,
- )
-
- joined_G = nx.compose(G, new_G)
-
- if counts_attribute is not None:
- nodes = list(set(G.nodes) & set(new_G.nodes))
- for i in nodes:
- joined_G.nodes[i]["total_counts"] = G.nodes[i][counts_attribute] + new_G.nodes[i]["counts"]
-
- return joined_G
-
-
-
-
-# debugging
-# if __name__ == '__main__':
-# file = open("C:\\Users\\thebe\\Downloads\\test.geojson", encoding='utf-8')
-# test_gdf = gpd.read_file(file)
-#
-# sm = Semgraph()
-#
-# G = sm.build_graph(test_gdf[:3000],
-# id_column='id',
-# text_column='text',
-# text_type_column='type',
-# toponym_column='only_full_street_name_numbers',
-# toponym_name_column='initial_street',
-# toponym_type_column='Toponims',
-# post_id_column='post_id',
-# parents_stack_column='parents_stack',
-# location_column='Location',
-# geometry_column='geometry')
-#
-# # print(len(G.nodes))
-# #
-# # G = sm.update_graph(G,
-# # test_gdf[3000:],
-# # id_column='id',
-# # text_column='text',
-# # text_type_column='type',
-# # toponym_column='only_full_street_name',
-# # toponym_name_column='initial_street',
-# # toponym_type_column='Toponims',
-# # post_id_column='post_id',
-# # parents_stack_column='parents_stack',
-# # counts_attribute='counts',
-# # location_column='Location',
-# # geometry_column='geometry')
-# #
-# # print(len(G.nodes))
-# #
-# # nx.write_graphml(G, 'name.graphml', encoding='utf-8')
-
-"""
-This module contains classes for retrieving and working with various types of data.
-
-@class:GeoDataGetter:
-This class is used to retrieve geospatial data from OpenStreetMap (OSM) based on given OSM ID and tags.
-
-@class:VKParser:
-A class for parsing and working with VK comments and posts. Combines posts and comments into one dataframe.
-
-@class:Streets:
-A class for working with street data.
-
-"""
-import osmnx as ox
-import geopandas as gpd
-import pandas as pd
-from sloyka.src.utils.constants import (
- GLOBAL_CRS,
- GLOBAL_METRIC_CRS,
-)
-from shapely.ops import transform
-from tqdm import tqdm
-import requests
-import sys
-import datetime
-import time
-import osm2geojson
-import random
-from typing import List, Optional
-from osmapi import OsmApi
-import networkx as nx
-from loguru import logger
-
-
-
-[docs]
-class VKParser:
- API_VERISON = "5.131"
- COUNT_ITEMS = 100
- # SLEEP_TIME = 0.5
- TIMEOUT_LIMIT = 15
-
-
-[docs]
- @staticmethod
- def get_group_name(domain, accsess_token):
- params = {"group_id": domain, "access_token": accsess_token, "v": VKParser.API_VERISON}
- response = requests.get("https://api.vk.com/method/groups.getById", params=params) # передвинуть повыше
- data = response.json()
- if "response" in data and data["response"]:
- group_name = data["response"][0]["name"]
- return pd.DataFrame({"group_name": [group_name]})
- else:
- print("Error while fetching group name:", data)
- return pd.DataFrame({"group_name": [None]})
-
-
-
-[docs]
- @staticmethod
- def get_owner_id_by_domain(domain, access_token):
- """
- Get the owner ID of a VK group by its domain.
-
- Args:
- domain (str): The domain of the VK group.
- access_token (str): The access token for the VK API.
-
- Returns:
- int: The owner ID of the VK group, or None if the request was not successful.
- """
- url = "https://api.vk.com/method/wall.get"
- params = {
- "domain": domain,
- "access_token": access_token,
- "v": VKParser.API_VERISON,
- }
- response = requests.get(url, params=params)
- if response.ok:
- owner_id = response.json()["response"]["items"][0]["owner_id"]
- else:
- owner_id = None
- return owner_id
-
-
-
-[docs]
- @staticmethod
- def get_group_post_ids(domain, access_token, post_num_limit, step) -> list:
- """
- A static method to retrieve a list of post IDs for a given group, based on the owner ID,
- access token, post number limit, and step size. Returns a list of post IDs.
- """
- offset = 0
- post_ids = []
-
- while offset < post_num_limit:
- print(offset, " | ", post_num_limit, end="\r")
- res = requests.get(
- "https://api.vk.com/method/wall.get",
- params={
- "access_token": access_token,
- "v": VKParser.API_VERISON,
- "domain": domain,
- "count": step,
- "offset": offset,
- },
- timeout=10,
- ).json()["response"]
- # print(res.json().keys())
- time.sleep(random.random())
-
- post_ids_new = [k["id"] for k in res["items"]]
- post_ids += post_ids_new
- offset += step
-
- return post_ids
-
-
-
-[docs]
- @staticmethod
- def get_subcomments(owner_id, post_id, access_token, params):
- """
- Retrieves subcomments from the VK API.
-
- Args:
- owner_id (int): The ID of the owner of the comments.
- post_id (int): The ID of the post.
- access_token (str): The access token for authentication.
- params (dict): Additional parameters for the API request.
-
- Returns:
- list: A list of subcomments retrieved from the API.
- """
- subcomments = []
-
- response = requests.get("https://api.vk.com/method/wall.getComments", params=params)
- # print(response.json().keys())
- time.sleep(random.random())
- data = response.json()
-
- if "response" in data:
- for item in data["response"]["items"]:
- item["date"] = datetime.datetime.utcfromtimestamp(item["date"]).strftime("%Y-%m-%d %H:%M:%S")
- if "likes" in item:
- item["likes.count"] = item["likes"]["count"]
- subcomments.append(item)
-
- return subcomments
-
-
-
-[docs]
- def get_comments(owner_id, post_id, access_token):
- """
- Get comments for a post on VK using the specified owner ID, post ID, and access token.
-
- Parameters:
- owner_id (int): The ID of the post owner.
- post_id (int): The ID of the post.
- access_token (str): The access token for authentication.
-
- Returns:
- list: A list of dictionaries containing comment information.
- """
- params = {
- "owner_id": owner_id,
- "post_id": post_id,
- "access_token": access_token,
- "v": VKParser.API_VERISON,
- "extended": 1,
- "count": 100,
- "need_likes": 1,
- }
-
- comments = []
-
- response = requests.get("https://api.vk.com/method/wall.getComments", params=params)
- # print(response.json().keys())
- time.sleep(random.random())
- data = response.json()
-
- if "response" in data:
- for item in data["response"]["items"]:
- if item["text"] == "":
- continue
- item["date"] = datetime.datetime.utcfromtimestamp(item["date"]).strftime("%Y-%m-%d %H:%M:%S")
- if "likes" in item:
- item["likes.count"] = item["likes"]["count"]
- comments.append(item)
- if item["thread"]["count"] > 0:
- params["comment_id"] = item["id"]
- subcomments = VKParser.get_subcomments(owner_id, post_id, access_token, params)
- comments.extend(subcomments)
- return comments
-
-
-
-[docs]
- @staticmethod
- def comments_to_dataframe(comments):
- """
- Convert comments to a DataFrame.
-
- Args:
- comments: List of comments to be converted.
-
- Returns:
- DataFrame: A DataFrame containing specific columns from the input comments.
- """
- df = pd.DataFrame(comments)
- df = df[["id", "from_id", "date", "text", "post_id", "parents_stack", "likes.count"]]
- return df
-
-
-
-[docs]
- @staticmethod
- def run_posts(domain, access_token, cutoff_date, number_of_messages=float("inf"), step=50):
- """
- A function to retrieve posts from a social media API based on specified parameters.
-
- Parameters:
- owner_id (int): The ID of the owner whose posts are being retrieved.
- access_token (str): The authentication token for accessing the API.
- step (int): The number of posts to retrieve in each API call.
- cutoff_date (str): The date to stop retrieving posts (format: '%Y-%m-%d').
- number_of_messages (float): The maximum number of messages to retrieve (default is infinity).
-
- Returns:
- pandas.DataFrame: A DataFrame containing the retrieved posts.
- """
-
- domain = domain
- offset = 0
- all_posts = []
- if step > number_of_messages:
- step = number_of_messages
- while offset < number_of_messages:
- print(offset, " | ", number_of_messages, end="\r")
-
- response = requests.get(
- "https://api.vk.com/method/wall.get",
- params={
- "access_token": access_token,
- "v": VKParser.API_VERISON,
- "domain": domain,
- "count": step,
- "offset": offset,
- }, timeout=600
- )
- if response.ok:
- # print(response.json().keys())
- data = response.json()["response"]["items"]
- offset += step
- current_posts = pd.json_normalize(data)
- current_posts = current_posts[["date", "id", "text", "views.count", "likes.count", "reposts.count"]]
- current_posts["date"] = [
- datetime.datetime.fromtimestamp(current_posts["date"][i]) for i in range(len(current_posts["date"]))
- ]
- current_posts["type"] = "post"
- all_posts.append(current_posts)
- print(current_posts.date.min())
- if any(current_posts["date"] < datetime.datetime.strptime(cutoff_date, "%Y-%m-%d")):
- print("posts downloaded")
- break
- else:
- continue
- time.sleep(random.random())
- df_posts = pd.concat(all_posts).reset_index(drop=True)
- df_posts = df_posts[df_posts.text.map(lambda x: len(x)) > 0]
- df_posts["text"] = df_posts["text"].str.replace(r"\n", "", regex=True)
- df_posts["link"] = df_posts["text"].str.extract(r"(https://\S+)")
- return df_posts
-
-
-
-[docs]
- @staticmethod
- def run_comments(domain, post_ids, access_token):
- owner_id = VKParser.get_owner_id_by_domain(domain, access_token)
- all_comments = []
- for post_id in tqdm(post_ids):
- comments = VKParser.get_comments(owner_id, post_id, access_token)
- all_comments.extend(comments)
- df = VKParser.comments_to_dataframe(all_comments)
- df["type"] = "comment"
- df = df.reset_index(drop=True)
- print("comments downloaded")
- return df
-
-
-
-[docs]
- @staticmethod
- def run_parser(domain, access_token, cutoff_date, number_of_messages=float("inf"), step=100):
- """
- Runs the parser with the given parameters and returns a combined DataFrame of posts and comments.
-
- :param owner_id: The owner ID for the parser.
- :param access_token: The user token for authentication.
- :param step: The step size for fetching data.
- :param cutoff_date: The cutoff date for fetching data.
- :param number_of_messages: The maximum number of messages to fetch. Defaults to positive infinity.
- :return: A combined DataFrame of posts and comments.
- """
- owner_id = VKParser.get_owner_id_by_domain(domain, access_token)
- df_posts = VKParser.run_posts(domain=owner_id, access_token=access_token, step=step, cutoff_date=cutoff_date, number_of_messages=number_of_messages)
- post_ids = df_posts["id"].tolist()
-
- df_comments = VKParser.run_comments(domain=owner_id, post_ids=post_ids, access_token=access_token)
- df_comments.loc[df_comments["parents_stack"].apply(lambda x: len(x) > 0), "type"] = "reply"
- for i in range(len(df_comments)):
- tmp = df_comments["parents_stack"].iloc[i]
- if tmp is not None:
- if len(tmp) > 0:
- df_comments["parents_stack"].iloc[i] = tmp[0]
- else:
- df_comments["parents_stack"].iloc[i] = None
-
- df_combined = df_comments.join(df_posts, on="post_id", rsuffix="_post")
- df_combined = pd.concat([df_posts, df_comments], ignore_index=True)
- df_group_name = VKParser.get_group_name(domain, access_token)
- df_combined["group_name"] = df_group_name["group_name"][0]
-
- return df_combined
-
-
-
-import pandas as pd
-import numpy as np
-from flair.models import SequenceTagger
-from flair.data import Sentence
-from rapidfuzz import fuzz
-from typing import List
-from sloyka.src.utils.constants import CITY_SERVICES_NAMES
-
-tagger = SequenceTagger.load("Glebosol/city_services")
-
-
-
-[docs]
-class City_services:
-
-[docs]
- def extraction_services(text):
- sentence = Sentence(text)
- tagger.predict(sentence)
- entities = sentence.get_spans("ner")
- entity_names = [entity.text for entity in entities]
- return entity_names
-
-
-
-[docs]
- def remove_last_letter(words):
- reduced_words = [word[:-1] for word in words]
- return reduced_words
-
-
- # def replace_with_most_similar(entity_names: List[str], CITY_SERVICES_NAMES: List[str]) -> List[str]:
- # true_city_services_names = [difflib.get_close_matches(word_entity_names, CITY_SERVICES_NAMES, n=1, cutoff=0.0)[0] for word_entity_names in entity_names]
- # return true_city_services_names
-
-
-[docs]
- def replace_with_most_similar(list_of_entities):
- similarity_matrix = np.zeros((len(list_of_entities), len(CITY_SERVICES_NAMES)))
- for i, word1 in enumerate(list_of_entities):
- for j, word2 in enumerate(CITY_SERVICES_NAMES):
- similarity = fuzz.ratio(word1, word2) / 100.0
- similarity_matrix[i, j] = similarity
- new_list_of_entities = list_of_entities.copy()
- for i in range(len(list_of_entities)):
- max_index = np.argmax(similarity_matrix[i])
- new_list_of_entities[i] = CITY_SERVICES_NAMES[max_index]
- return new_list_of_entities
-
-
-
-[docs]
- def run(self, df, text_column):
- df["City_services_extraced"] = df[text_column].apply(lambda text: City_services.extraction_services(text))
- df["City_services_cuted"] = df["City_services_extraced"].apply(
- lambda row: City_services.remove_last_letter(row)
- )
- df["City_services"] = df["City_services_cuted"].apply(lambda row: City_services.replace_with_most_similar(row))
- df.drop("City_services_cuted", axis=1, inplace=True)
- return df
-
-
-
-import networkx as nx
-import folium
-import random
-from folium.plugins import MarkerCluster
-
-
-
-[docs]
-def draw_graph_folium(graph_path, output_file=None):
- """
- Visualizes a graph from the given graph_path using Folium and MarkerCluster.
-
- Args:
- graph_path (str): The path to the graphml file.
- output_file (str, optional): The file to save the visualization to. Defaults to None.
-
- Returns:
- folium.Map: The folium map object representing the visualized graph.
- """
- G = nx.read_graphml(graph_path)
-
- color_mapping = {
- -1: "gray",
- 0: "blue",
- 1: "green",
- 2: "purple",
- 3: "cyan",
- 4: "brown",
- 5: "orange",
- 6: "pink",
- 7: "darkred",
- 8: "yellow",
- 9: "beige",
- 10: "darkgreen",
- 11: "lightgreen",
- 12: "darkblue",
- 13: "lightblue",
- 14: "darkpurple",
- 15: "cadetblue",
- 16: "red",
- 17: "lightgreen",
- 18: "lightblue",
- }
-
- target_clusters = range(1, 19)
-
- m = folium.Map(
- location=[59.9343, 30.3351],
- zoom_start=10,
- tiles="cartodbdark_matter",
- control_scale=True,
- )
-
- for c in target_clusters:
- mc = MarkerCluster(name=f"{c} | cluster")
-
- for node, data in G.nodes(data=True):
- if "Lat" in data and "Lon" in data:
- main_node_location = [data["Lat"], data["Lon"]]
- for n in G.neighbors(node):
- if "Cluster" in G.nodes[n] and G.nodes[n]["Cluster"] == c:
- neighbor_data = G.nodes[n]
- neighbor_location = [
- main_node_location[1] + random.uniform(-0.0008, 0.0008),
- main_node_location[0] + random.uniform(-0.0008, 0.0008),
- ]
- folium.CircleMarker(
- location=neighbor_location,
- radius=10,
- color=color_mapping[G.nodes[n]["Cluster"]],
- fill=True,
- fill_color=color_mapping[G.nodes[n]["Cluster"]],
- popup=neighbor_data,
- name=f'cluster_{G.nodes[n]["Cluster"]}',
- ).add_to(mc)
- mc.add_to(m)
-
- folium.LayerControl().add_to(m)
- if not output_file is None:
- m.save(output_file)
-
- return m
-
-
Attention
-
- You are reading the latest
- (unstable) version of this documentation, which may document features not available
- or compatible with the latest Crest packages released on the Unity Asset Store.
-
- View the stable version of this page. -
-Attention
-- Looks like you are on a version without a published tag. - We will redirect you to the latest documentation automatically. - If it does not redirect automatically in a few seconds, please click the follow: - ${newUrl.href} -
-' + - '' + - _("Hide Search Matches") + - "
" - ) - ); - }, - - /** - * helper function to hide the search marks again - */ - hideSearchWords: () => { - document - .querySelectorAll("#searchbox .highlight-link") - .forEach((el) => el.remove()); - document - .querySelectorAll("span.highlighted") - .forEach((el) => el.classList.remove("highlighted")); - localStorage.removeItem("sphinx_highlight_terms") - }, - - initEscapeListener: () => { - // only install a listener if it is really needed - if (!DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS) return; - - document.addEventListener("keydown", (event) => { - // bail for input elements - if (BLACKLISTED_KEY_CONTROL_ELEMENTS.has(document.activeElement.tagName)) return; - // bail with special keys - if (event.shiftKey || event.altKey || event.ctrlKey || event.metaKey) return; - if (DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS && (event.key === "Escape")) { - SphinxHighlight.hideSearchWords(); - event.preventDefault(); - } - }); - }, -}; - -_ready(() => { - /* Do not call highlightSearchWords() when we are on the search page. - * It will highlight words from the *previous* search query. - */ - if (typeof Search === "undefined") SphinxHighlight.highlightSearchWords(); - SphinxHighlight.initEscapeListener(); -}); diff --git a/docs/build/html/_static/styles/furo-extensions.css b/docs/build/html/_static/styles/furo-extensions.css deleted file mode 100644 index bc447f2..0000000 --- a/docs/build/html/_static/styles/furo-extensions.css +++ /dev/null @@ -1,2 +0,0 @@ -#furo-sidebar-ad-placement{padding:var(--sidebar-item-spacing-vertical) var(--sidebar-item-spacing-horizontal)}#furo-sidebar-ad-placement .ethical-sidebar{background:var(--color-background-secondary);border:none;box-shadow:none}#furo-sidebar-ad-placement .ethical-sidebar:hover{background:var(--color-background-hover)}#furo-sidebar-ad-placement .ethical-sidebar a{color:var(--color-foreground-primary)}#furo-sidebar-ad-placement .ethical-callout a{color:var(--color-foreground-secondary)!important}#furo-readthedocs-versions{background:transparent;display:block;position:static;width:100%}#furo-readthedocs-versions .rst-versions{background:#1a1c1e}#furo-readthedocs-versions .rst-current-version{background:var(--color-sidebar-item-background);cursor:unset}#furo-readthedocs-versions .rst-current-version:hover{background:var(--color-sidebar-item-background)}#furo-readthedocs-versions .rst-current-version .fa-book{color:var(--color-foreground-primary)}#furo-readthedocs-versions>.rst-other-versions{padding:0}#furo-readthedocs-versions>.rst-other-versions small{opacity:1}#furo-readthedocs-versions .injected .rst-versions{position:unset}#furo-readthedocs-versions:focus-within,#furo-readthedocs-versions:hover{box-shadow:0 0 0 1px var(--color-sidebar-background-border)}#furo-readthedocs-versions:focus-within .rst-current-version,#furo-readthedocs-versions:hover .rst-current-version{background:#1a1c1e;font-size:inherit;height:auto;line-height:inherit;padding:12px;text-align:right}#furo-readthedocs-versions:focus-within .rst-current-version .fa-book,#furo-readthedocs-versions:hover .rst-current-version .fa-book{color:#fff;float:left}#furo-readthedocs-versions:focus-within .fa-caret-down,#furo-readthedocs-versions:hover .fa-caret-down{display:none}#furo-readthedocs-versions:focus-within .injected,#furo-readthedocs-versions:focus-within .rst-current-version,#furo-readthedocs-versions:focus-within .rst-other-versions,#furo-readthedocs-versions:hover .injected,#furo-readthedocs-versions:hover .rst-current-version,#furo-readthedocs-versions:hover .rst-other-versions{display:block}#furo-readthedocs-versions:focus-within>.rst-current-version,#furo-readthedocs-versions:hover>.rst-current-version{display:none}.highlight:hover button.copybtn{color:var(--color-code-foreground)}.highlight button.copybtn{align-items:center;background-color:var(--color-code-background);border:none;color:var(--color-background-item);cursor:pointer;height:1.25em;opacity:1;right:.5rem;top:.625rem;transition:color .3s,opacity .3s;width:1.25em}.highlight button.copybtn:hover{background-color:var(--color-code-background);color:var(--color-brand-content)}.highlight button.copybtn:after{background-color:transparent;color:var(--color-code-foreground);display:none}.highlight button.copybtn.success{color:#22863a;transition:color 0ms}.highlight button.copybtn.success:after{display:block}.highlight button.copybtn svg{padding:0}body{--sd-color-primary:var(--color-brand-primary);--sd-color-primary-highlight:var(--color-brand-content);--sd-color-primary-text:var(--color-background-primary);--sd-color-shadow:rgba(0,0,0,.05);--sd-color-card-border:var(--color-card-border);--sd-color-card-border-hover:var(--color-brand-content);--sd-color-card-background:var(--color-card-background);--sd-color-card-text:var(--color-foreground-primary);--sd-color-card-header:var(--color-card-marginals-background);--sd-color-card-footer:var(--color-card-marginals-background);--sd-color-tabs-label-active:var(--color-brand-content);--sd-color-tabs-label-hover:var(--color-foreground-muted);--sd-color-tabs-label-inactive:var(--color-foreground-muted);--sd-color-tabs-underline-active:var(--color-brand-content);--sd-color-tabs-underline-hover:var(--color-foreground-border);--sd-color-tabs-underline-inactive:var(--color-background-border);--sd-color-tabs-overline:var(--color-background-border);--sd-color-tabs-underline:var(--color-background-border)}.sd-tab-content{box-shadow:0 -2px var(--sd-color-tabs-overline),0 1px var(--sd-color-tabs-underline)}.sd-card{box-shadow:0 .1rem .25rem var(--sd-color-shadow),0 0 .0625rem rgba(0,0,0,.1)}.sd-shadow-sm{box-shadow:0 .1rem .25rem var(--sd-color-shadow),0 0 .0625rem rgba(0,0,0,.1)!important}.sd-shadow-md{box-shadow:0 .3rem .75rem var(--sd-color-shadow),0 0 .0625rem rgba(0,0,0,.1)!important}.sd-shadow-lg{box-shadow:0 .6rem 1.5rem var(--sd-color-shadow),0 0 .0625rem rgba(0,0,0,.1)!important}.sd-card-hover:hover{transform:none}.sd-cards-carousel{gap:.25rem;padding:.25rem}body{--tabs--label-text:var(--color-foreground-muted);--tabs--label-text--hover:var(--color-foreground-muted);--tabs--label-text--active:var(--color-brand-content);--tabs--label-text--active--hover:var(--color-brand-content);--tabs--label-background:transparent;--tabs--label-background--hover:transparent;--tabs--label-background--active:transparent;--tabs--label-background--active--hover:transparent;--tabs--padding-x:0.25em;--tabs--margin-x:1em;--tabs--border:var(--color-background-border);--tabs--label-border:transparent;--tabs--label-border--hover:var(--color-foreground-muted);--tabs--label-border--active:var(--color-brand-content);--tabs--label-border--active--hover:var(--color-brand-content)}[role=main] .container{max-width:none;padding-left:0;padding-right:0}.shadow.docutils{border:none;box-shadow:0 .2rem .5rem rgba(0,0,0,.05),0 0 .0625rem rgba(0,0,0,.1)!important}.sphinx-bs .card{background-color:var(--color-background-secondary);color:var(--color-foreground)} -/*# sourceMappingURL=furo-extensions.css.map*/ \ No newline at end of file diff --git a/docs/build/html/_static/styles/furo-extensions.css.map b/docs/build/html/_static/styles/furo-extensions.css.map deleted file mode 100644 index 9ba5637..0000000 --- a/docs/build/html/_static/styles/furo-extensions.css.map +++ /dev/null @@ -1 +0,0 @@ -{"version":3,"file":"styles/furo-extensions.css","mappings":"AAGA,2BACE,oFACA,4CAKE,6CAHA,YACA,eAEA,CACA,kDACE,yCAEF,8CACE,sCAEJ,8CACE,kDAEJ,2BAGE,uBACA,cAHA,gBACA,UAEA,CAGA,yCACE,mBAEF,gDAEE,gDADA,YACA,CACA,sDACE,gDACF,yDACE,sCAEJ,+CACE,UACA,qDACE,UAGF,mDACE,eAEJ,yEAEE,4DAEA,mHASE,mBAPA,kBAEA,YADA,oBAGA,aADA,gBAIA,CAEA,qIAEE,WADA,UACA,CAEJ,uGACE,aAEF,iUAGE,cAEF,mHACE,aC1EJ,gCACE,mCAEF,0BAKE,mBAUA,8CACA,YAFA,mCAKA,eAZA,cALA,UASA,YADA,YAYA,iCAdA,YAcA,CAEA,gCAEE,8CADA,gCACA,CAEF,gCAGE,6BADA,mCADA,YAEA,CAEF,kCAEE,cADA,oBACA,CACA,wCACE,cAEJ,8BACE,UC5CN,KAEE,6CAA8C,CAC9C,uDAAwD,CACxD,uDAAwD,CAGxD,iCAAsC,CAGtC,+CAAgD,CAChD,uDAAwD,CACxD,uDAAwD,CACxD,oDAAqD,CACrD,6DAA8D,CAC9D,6DAA8D,CAG9D,uDAAwD,CACxD,yDAA0D,CAC1D,4DAA6D,CAC7D,2DAA4D,CAC5D,8DAA+D,CAC/D,iEAAkE,CAClE,uDAAwD,CACxD,wDAAyD,CAG3D,gBACE,qFAGF,SACE,6EAEF,cACE,uFAEF,cACE,uFAEF,cACE,uFAGF,qBACE,eAEF,mBACE,WACA,eChDF,KACE,gDAAiD,CACjD,uDAAwD,CACxD,qDAAsD,CACtD,4DAA6D,CAC7D,oCAAqC,CACrC,2CAA4C,CAC5C,4CAA6C,CAC7C,mDAAoD,CACpD,wBAAyB,CACzB,oBAAqB,CACrB,6CAA8C,CAC9C,gCAAiC,CACjC,yDAA0D,CAC1D,uDAAwD,CACxD,8DAA+D,CCbjE,uBACE,eACA,eACA,gBAGF,iBACE,YACA,+EAGF,iBACE,mDACA","sources":["webpack:///./src/furo/assets/styles/extensions/_readthedocs.sass","webpack:///./src/furo/assets/styles/extensions/_copybutton.sass","webpack:///./src/furo/assets/styles/extensions/_sphinx-design.sass","webpack:///./src/furo/assets/styles/extensions/_sphinx-inline-tabs.sass","webpack:///./src/furo/assets/styles/extensions/_sphinx-panels.sass"],"sourcesContent":["// This file contains the styles used for tweaking how ReadTheDoc's embedded\n// contents would show up inside the theme.\n\n#furo-sidebar-ad-placement\n padding: var(--sidebar-item-spacing-vertical) var(--sidebar-item-spacing-horizontal)\n .ethical-sidebar\n // Remove the border and box-shadow.\n border: none\n box-shadow: none\n // Manage the background colors.\n background: var(--color-background-secondary)\n &:hover\n background: var(--color-background-hover)\n // Ensure the text is legible.\n a\n color: var(--color-foreground-primary)\n\n .ethical-callout a\n color: var(--color-foreground-secondary) !important\n\n#furo-readthedocs-versions\n position: static\n width: 100%\n background: transparent\n display: block\n\n // Make the background color fit with the theme's aesthetic.\n .rst-versions\n background: rgb(26, 28, 30)\n\n .rst-current-version\n cursor: unset\n background: var(--color-sidebar-item-background)\n &:hover\n background: var(--color-sidebar-item-background)\n .fa-book\n color: var(--color-foreground-primary)\n\n > .rst-other-versions\n padding: 0\n small\n opacity: 1\n\n .injected\n .rst-versions\n position: unset\n\n &:hover,\n &:focus-within\n box-shadow: 0 0 0 1px var(--color-sidebar-background-border)\n\n .rst-current-version\n // Undo the tweaks done in RTD's CSS\n font-size: inherit\n line-height: inherit\n height: auto\n text-align: right\n padding: 12px\n\n // Match the rest of the body\n background: #1a1c1e\n\n .fa-book\n float: left\n color: white\n\n .fa-caret-down\n display: none\n\n .rst-current-version,\n .rst-other-versions,\n .injected\n display: block\n\n > .rst-current-version\n display: none\n",".highlight\n &:hover button.copybtn\n color: var(--color-code-foreground)\n\n button.copybtn\n // Make it visible\n opacity: 1\n\n // Align things correctly\n align-items: center\n\n height: 1.25em\n width: 1.25em\n\n top: 0.625rem // $code-spacing-vertical\n right: 0.5rem\n\n // Make it look better\n color: var(--color-background-item)\n background-color: var(--color-code-background)\n border: none\n\n // Change to cursor to make it obvious that you can click on it\n cursor: pointer\n\n // Transition smoothly, for aesthetics\n transition: color 300ms, opacity 300ms\n\n &:hover\n color: var(--color-brand-content)\n background-color: var(--color-code-background)\n\n &::after\n display: none\n color: var(--color-code-foreground)\n background-color: transparent\n\n &.success\n transition: color 0ms\n color: #22863a\n &::after\n display: block\n\n svg\n padding: 0\n","body\n // Colors\n --sd-color-primary: var(--color-brand-primary)\n --sd-color-primary-highlight: var(--color-brand-content)\n --sd-color-primary-text: var(--color-background-primary)\n\n // Shadows\n --sd-color-shadow: rgba(0, 0, 0, 0.05)\n\n // Cards\n --sd-color-card-border: var(--color-card-border)\n --sd-color-card-border-hover: var(--color-brand-content)\n --sd-color-card-background: var(--color-card-background)\n --sd-color-card-text: var(--color-foreground-primary)\n --sd-color-card-header: var(--color-card-marginals-background)\n --sd-color-card-footer: var(--color-card-marginals-background)\n\n // Tabs\n --sd-color-tabs-label-active: var(--color-brand-content)\n --sd-color-tabs-label-hover: var(--color-foreground-muted)\n --sd-color-tabs-label-inactive: var(--color-foreground-muted)\n --sd-color-tabs-underline-active: var(--color-brand-content)\n --sd-color-tabs-underline-hover: var(--color-foreground-border)\n --sd-color-tabs-underline-inactive: var(--color-background-border)\n --sd-color-tabs-overline: var(--color-background-border)\n --sd-color-tabs-underline: var(--color-background-border)\n\n// Tabs\n.sd-tab-content\n box-shadow: 0 -2px var(--sd-color-tabs-overline), 0 1px var(--sd-color-tabs-underline)\n\n// Shadows\n.sd-card // Have a shadow by default\n box-shadow: 0 0.1rem 0.25rem var(--sd-color-shadow), 0 0 0.0625rem rgba(0, 0, 0, 0.1)\n\n.sd-shadow-sm\n box-shadow: 0 0.1rem 0.25rem var(--sd-color-shadow), 0 0 0.0625rem rgba(0, 0, 0, 0.1) !important\n\n.sd-shadow-md\n box-shadow: 0 0.3rem 0.75rem var(--sd-color-shadow), 0 0 0.0625rem rgba(0, 0, 0, 0.1) !important\n\n.sd-shadow-lg\n box-shadow: 0 0.6rem 1.5rem var(--sd-color-shadow), 0 0 0.0625rem rgba(0, 0, 0, 0.1) !important\n\n// Cards\n.sd-card-hover:hover // Don't change scale on hover\n transform: none\n\n.sd-cards-carousel // Have a bit of gap in the carousel by default\n gap: 0.25rem\n padding: 0.25rem\n","// This file contains styles to tweak sphinx-inline-tabs to work well with Furo.\n\nbody\n --tabs--label-text: var(--color-foreground-muted)\n --tabs--label-text--hover: var(--color-foreground-muted)\n --tabs--label-text--active: var(--color-brand-content)\n --tabs--label-text--active--hover: var(--color-brand-content)\n --tabs--label-background: transparent\n --tabs--label-background--hover: transparent\n --tabs--label-background--active: transparent\n --tabs--label-background--active--hover: transparent\n --tabs--padding-x: 0.25em\n --tabs--margin-x: 1em\n --tabs--border: var(--color-background-border)\n --tabs--label-border: transparent\n --tabs--label-border--hover: var(--color-foreground-muted)\n --tabs--label-border--active: var(--color-brand-content)\n --tabs--label-border--active--hover: var(--color-brand-content)\n","// This file contains styles to tweak sphinx-panels to work well with Furo.\n\n// sphinx-panels includes Bootstrap 4, which uses .container which can conflict\n// with docutils' `.. container::` directive.\n[role=\"main\"] .container\n max-width: initial\n padding-left: initial\n padding-right: initial\n\n// Make the panels look nicer!\n.shadow.docutils\n border: none\n box-shadow: 0 0.2rem 0.5rem rgba(0, 0, 0, 0.05), 0 0 0.0625rem rgba(0, 0, 0, 0.1) !important\n\n// Make panel colors respond to dark mode\n.sphinx-bs .card\n background-color: var(--color-background-secondary)\n color: var(--color-foreground)\n"],"names":[],"sourceRoot":""} \ No newline at end of file diff --git a/docs/build/html/_static/styles/furo.css b/docs/build/html/_static/styles/furo.css deleted file mode 100644 index e3d4e57..0000000 --- a/docs/build/html/_static/styles/furo.css +++ /dev/null @@ -1,2 +0,0 @@ -/*! normalize.css v8.0.1 | MIT License | github.com/necolas/normalize.css */html{line-height:1.15;-webkit-text-size-adjust:100%}body{margin:0}main{display:block}h1{font-size:2em;margin:.67em 0}hr{box-sizing:content-box;height:0;overflow:visible}pre{font-family:monospace,monospace;font-size:1em}a{background-color:transparent}abbr[title]{border-bottom:none;text-decoration:underline;text-decoration:underline dotted}b,strong{font-weight:bolder}code,kbd,samp{font-family:monospace,monospace;font-size:1em}sub,sup{font-size:75%;line-height:0;position:relative;vertical-align:baseline}sub{bottom:-.25em}sup{top:-.5em}img{border-style:none}button,input,optgroup,select,textarea{font-family:inherit;font-size:100%;line-height:1.15;margin:0}button,input{overflow:visible}button,select{text-transform:none}[type=button],[type=reset],[type=submit],button{-webkit-appearance:button}[type=button]::-moz-focus-inner,[type=reset]::-moz-focus-inner,[type=submit]::-moz-focus-inner,button::-moz-focus-inner{border-style:none;padding:0}[type=button]:-moz-focusring,[type=reset]:-moz-focusring,[type=submit]:-moz-focusring,button:-moz-focusring{outline:1px dotted ButtonText}fieldset{padding:.35em .75em .625em}legend{box-sizing:border-box;color:inherit;display:table;max-width:100%;padding:0;white-space:normal}progress{vertical-align:baseline}textarea{overflow:auto}[type=checkbox],[type=radio]{box-sizing:border-box;padding:0}[type=number]::-webkit-inner-spin-button,[type=number]::-webkit-outer-spin-button{height:auto}[type=search]{-webkit-appearance:textfield;outline-offset:-2px}[type=search]::-webkit-search-decoration{-webkit-appearance:none}::-webkit-file-upload-button{-webkit-appearance:button;font:inherit}details{display:block}summary{display:list-item}[hidden],template{display:none}@media print{.content-icon-container,.headerlink,.mobile-header,.related-pages{display:none!important}.highlight{border:.1pt solid var(--color-foreground-border)}a,blockquote,dl,ol,pre,table,ul{page-break-inside:avoid}caption,figure,h1,h2,h3,h4,h5,h6,img{page-break-after:avoid;page-break-inside:avoid}dl,ol,ul{page-break-before:avoid}}.visually-hidden{height:1px!important;margin:-1px!important;overflow:hidden!important;padding:0!important;position:absolute!important;width:1px!important;clip:rect(0,0,0,0)!important;background:var(--color-background-primary);border:0!important;color:var(--color-foreground-primary);white-space:nowrap!important}:-moz-focusring{outline:auto}body{--font-stack:-apple-system,BlinkMacSystemFont,Segoe UI,Helvetica,Arial,sans-serif,Apple Color Emoji,Segoe UI Emoji;--font-stack--monospace:"SFMono-Regular",Menlo,Consolas,Monaco,Liberation Mono,Lucida Console,monospace;--font-stack--headings:var(--font-stack);--font-size--normal:100%;--font-size--small:87.5%;--font-size--small--2:81.25%;--font-size--small--3:75%;--font-size--small--4:62.5%;--sidebar-caption-font-size:var(--font-size--small--2);--sidebar-item-font-size:var(--font-size--small);--sidebar-search-input-font-size:var(--font-size--small);--toc-font-size:var(--font-size--small--3);--toc-font-size--mobile:var(--font-size--normal);--toc-title-font-size:var(--font-size--small--4);--admonition-font-size:0.8125rem;--admonition-title-font-size:0.8125rem;--code-font-size:var(--font-size--small--2);--api-font-size:var(--font-size--small);--header-height:calc(var(--sidebar-item-line-height) + var(--sidebar-item-spacing-vertical)*4);--header-padding:0.5rem;--sidebar-tree-space-above:1.5rem;--sidebar-caption-space-above:1rem;--sidebar-item-line-height:1rem;--sidebar-item-spacing-vertical:0.5rem;--sidebar-item-spacing-horizontal:1rem;--sidebar-item-height:calc(var(--sidebar-item-line-height) + var(--sidebar-item-spacing-vertical)*2);--sidebar-expander-width:var(--sidebar-item-height);--sidebar-search-space-above:0.5rem;--sidebar-search-input-spacing-vertical:0.5rem;--sidebar-search-input-spacing-horizontal:0.5rem;--sidebar-search-input-height:1rem;--sidebar-search-icon-size:var(--sidebar-search-input-height);--toc-title-padding:0.25rem 0;--toc-spacing-vertical:1.5rem;--toc-spacing-horizontal:1.5rem;--toc-item-spacing-vertical:0.4rem;--toc-item-spacing-horizontal:1rem;--icon-search:url('data:image/svg+xml;charset=utf-8,