diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..db2a0ee --- /dev/null +++ b/.gitignore @@ -0,0 +1,126 @@ +outputs/ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ +docs/_static +docs/_templates + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ diff --git a/Scweet.egg-info/PKG-INFO b/Scweet.egg-info/PKG-INFO deleted file mode 100644 index eb10a02..0000000 --- a/Scweet.egg-info/PKG-INFO +++ /dev/null @@ -1,173 +0,0 @@ -Metadata-Version: 2.1 -Name: Scweet -Version: 1.8 -Summary: Tool for scraping Tweets -Home-page: https://github.com/Altimis/Scweet -Author: Yassine AIT JEDDI and Soufiane Bengadi -Author-email: aitjeddiyassine@gmail.com -License: MIT -Download-URL: https://github.com/Altimis/Scweet/archive/v0.3.0.tar.gz -Keywords: twitter,scraper,python,crawl,following,followers,twitter-scraper,tweets -Platform: UNKNOWN -Classifier: Development Status :: 4 - Beta -Classifier: Intended Audience :: Developers -Classifier: Topic :: Software Development :: Build Tools -Classifier: License :: OSI Approved :: MIT License -Classifier: Programming Language :: Python :: 3 -Classifier: Programming Language :: Python :: 3.4 -Classifier: Programming Language :: Python :: 3.5 -Classifier: Programming Language :: Python :: 3.6 -Classifier: Programming Language :: Python :: 3.7 -Classifier: Programming Language :: Python :: 3.8 -Description-Content-Type: text/markdown -License-File: LICENSE.txt - - -# A simple and unlimited twitter scraper with python. - -In the last days, Twitter banned almost every twitter scrapers. This repository represent an alternative tool to scrap tweets between two given dates (since and until), for a given language and list of words or account name, and saves a csv file containing retrieved data : - -``[UserScreenName, UserName, Timestamp, Text, Embedded_text, Emojis, Comments, Likes, Retweets, Image link, Tweet URL]`` - -It is also possible to download and save the images from ``Image link`` by passing the argument ``save_images = True``, If you only want to scrape images, I recommand to set the argument ``display_type = image`` to show only tweets that contain images. - -You can scrape user profile information as well, including following and followers. - -Authentification is required in the case of followers/following scraping. It is recommended to log in with a new account (if the list of followers is very long, it is possible that your account will be banned). To log in to your account, you need to enter your username ``SCWEET_USERNAME`` and password ``SCWEET_PASSWORD`` in [env](https://github.com/Altimis/Scweet/blob/master/.env) file. You can controle the ``wait`` parameter in the ``get_users_followers`` and ``get_users_following`` functions. - -The [user](https://github.com/Altimis/Scweet/blob/master/Scweet/user.py) code allows you to get all information of a list of users, including location, join date and lists of **followers and following**. Check [this example](https://github.com/Altimis/Scweet/blob/master/Scweet/Example.py). - -## Requierments : - -```pip install -r requirements.txt``` - -Note : You need to have Chrome installed in your system - -## Results : - -### Tweets : - -The CSV file contains the following features (for each tweet) : - -- 'UserScreenName' : -- 'UserName' : UserName -- 'Timestamp' : timestamp of the tweet -- 'Text' : tweet text -- 'Embedded_text' : embedded text written above the tweet. It could be an image, video or even another tweet if the tweet in question is a reply. -- 'Emojis' : emojis existing in tweet -- 'Comments' : number of comments -- 'Likes' : number of likes -- 'Retweets' : number of retweets -- 'Image link' : Link of the image in the tweet -- 'Tweet URL' : Tweet URL. - -### Following / Followers : - -The ``get_users_following`` and ``get_users_followers`` in [user](https://github.com/Altimis/Scweet/blob/master/Scweet/user.py) give a list of following and followers for a given list of users. - -**More features will be added soon, such as "all reaplies of each tweet for a specific twitter account"** - -## Usage : - -### Library : - -The library is now available. To install the library, run : - -``pip install Scweet==1.6`` - -After installing, you can use it as follow : - -``` -from Scweet.scweet import scrape -from Scweet.user import get_user_information, get_users_following, get_users_followers`` -``` - -**scrape top tweets with the words 'bitcoin','ethereum' geolocated less than 200 km from Alicante (Spain) Lat=38.3452, Long=-0.481006 and without replies.** -**the process is slower as the interval is smaller (choose an interval that can divide the period of time betwee, start and max date)** - -``` -data = scrape(words=['bitcoin','ethereum'], since="2021-10-01", until="2021-10-05", from_account = None, interval=1, headless=False, display_type="Top", save_images=False, lang="en", - resume=False, filter_replies=False, proximity=False, geocode="38.3452,-0.481006,200km") -``` - -**scrape top tweets of with the hashtag #bitcoin, in proximity and without replies.** -**the process is slower as the interval is smaller (choose an interval that can divide the period of time betwee, start and max date)** - -``` -data = scrape(hashtag="bitcoin", since="2021-08-05", until=None, from_account = None, interval=1, - headless=True, display_type="Top", save_images=False, - resume=False, filter_replies=True, proximity=True) -``` - -**Get the main information of a given list of users** -**These users belongs to my following.** - -``` -users = ['nagouzil', '@yassineaitjeddi', 'TahaAlamIdrissi', - '@Nabila_Gl', 'geceeekusuu', '@pabu232', '@av_ahmet', '@x_born_to_die_x'] -``` - -**this function return a list that contains : ** -**["nb of following","nb of followers", "join date", "birthdate", "location", "website", "description"]** - -``` -users_info = get_user_information(users, headless=True) -``` - -**Get followers and following of a given list of users** -**Enter your username and password in .env file. I recommande you dont use your main account.** -**Increase wait argument to avoid banning your account and maximise the crawling process if the internet is slow. I used 1 and it's safe.** - -**set your .env file with SCWEET_EMAIL, SCWEET_USERNAME and SCWEET_PASSWORD variables and provide its path** - -``` -env_path = ".env" - -following = get_users_following(users=users, env=env_path, verbose=0, headless=True, wait=2, limit=50, file_path=None) - -followers = get_users_followers(users=users, env=env_path, verbose=0, headless=True, wait=2, limit=50, file_path=None) -``` - -### Terminal : - -``` -Scrape tweets. - -optional arguments: - -h, --help show this help message and exit - --words WORDS Words to search. they should be separated by "//" : Cat//Dog. - --from_account FROM_ACCOUNT - Tweets posted by "from_account" account. - --to_account TO_ACCOUNT - Tweets posted in response to "to_account" account. - --mention_account MENTION_ACCOUNT - Tweets mention "mention_account" account. - --hashtag HASHTAG - Tweets containing #hashtag - --until UNTIL max date for search query. example : %Y-%m-%d. - --since SINCE - Start date for search query. example : %Y-%m-%d. - --interval INTERVAL Interval days between each start date and end date for - search queries. example : 5. - --lang LANG tweets language. Example : "en" for english and "fr" - for french. - --headless HEADLESS Headless webdrives or not. True or False - --limit LIMIT Limit tweets per - --display_type DISPLAY_TYPE - Display type of twitter page : Latest or Top tweets - --resume RESUME Resume the last scraping. specify the csv file path. - --proxy PROXY Proxy server - --proximity PROXIMITY Proximity - --geocode GEOCODE Geographical location coordinates to center the - search (), radius. No compatible with proximity - --minreplies MINREPLIES - Min. number of replies to the tweet - --minlikes MINLIKES Min. number of likes to the tweet - --minretweets MINRETWEETS - Min. number of retweets to the tweet - -### To execute the script : -python scweet.py --words "excellente//car" --to_account "tesla" --until 2020-01-05 --since 2020-01-01 --limit 10 --interval 1 --display_type Latest --lang="en" --headless True -``` - - diff --git a/Scweet.egg-info/SOURCES.txt b/Scweet.egg-info/SOURCES.txt deleted file mode 100644 index fbe3ccd..0000000 --- a/Scweet.egg-info/SOURCES.txt +++ /dev/null @@ -1,15 +0,0 @@ -LICENSE.txt -README.md -setup.cfg -setup.py -Scweet/__init__.py -Scweet/__version__.py -Scweet/const.py -Scweet/scweet.py -Scweet/user.py -Scweet/utils.py -Scweet.egg-info/PKG-INFO -Scweet.egg-info/SOURCES.txt -Scweet.egg-info/dependency_links.txt -Scweet.egg-info/requires.txt -Scweet.egg-info/top_level.txt \ No newline at end of file diff --git a/Scweet.egg-info/dependency_links.txt b/Scweet.egg-info/dependency_links.txt deleted file mode 100644 index 8b13789..0000000 --- a/Scweet.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/Scweet.egg-info/requires.txt b/Scweet.egg-info/requires.txt deleted file mode 100644 index 1c799b2..0000000 --- a/Scweet.egg-info/requires.txt +++ /dev/null @@ -1,5 +0,0 @@ -selenium -pandas -python-dotenv -chromedriver-autoinstaller -urllib3 diff --git a/Scweet.egg-info/top_level.txt b/Scweet.egg-info/top_level.txt deleted file mode 100644 index 199a3ab..0000000 --- a/Scweet.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -Scweet diff --git a/Scweet/replies.py b/Scweet/replies.py new file mode 100644 index 0000000..446149a --- /dev/null +++ b/Scweet/replies.py @@ -0,0 +1,237 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 + +""" +Get all of the replies to tweets. +""" + +from typing import List, Dict, Any, Union +from .utils import init_driver +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.common.exceptions import NoSuchElementException, TimeoutException +from selenium.webdriver.common.action_chains import ActionChains +from selenium.webdriver.common.by import By +import time, random, re + +def get_replies_from_tweets( + urls: List[str], + headless: bool=True, + proxy=None, + show_images: bool=False, + option=None, + firefox: bool=False, + env=None, + ) -> List[Dict[str, Any]]: + driver = init_driver( + headless=headless, + proxy=proxy, + show_images=show_images, + option=option, + firefox=firefox, + env=env, + ) + + driver.get('https://twitter.com') + replies = [] + for url in urls: + replies += get_replies(url, driver) + + return replies + + + +def close_tab(driver): + try: + if len(driver.window_handles) > 1: + driver.close() + except Exception as e: + print("Cannot close tab!") + try: + driver.switch_to.window(driver.window_handles[0]) + except Exception as e: + print("Cannot change focus!") + + +def open_tab(driver): + driver.execute_script('window.open("");') + driver.switch_to.window(driver.window_handles[1]) + + +def get_replies(tweet_url: str, driver): + print(tweet_url) + open_tab(driver) + driver.set_page_load_timeout(5) + try: + driver.get(tweet_url) + except TimeoutException as te: + print("Failed to get tweet") + print(te) + if len(driver.window_handles) > 1: + driver.close() + return [] + tweets_xpath = '//article[@data-testid="tweet"]' + try: + cards_el = WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.XPATH, tweets_xpath)) + ) + except TimeoutException as te: + close_tab(driver) + return [] + except Exception as e: + close_tab(driver) + return [] + + show_more_tries, show_more_max = 0, 20 + while show_more_tries < show_more_max: + try: + show_els = driver.find_elements(By.XPATH, "//span[contains(text(), 'Show')]") + if not show_els: + raise NoSuchElementException + show_more_button = show_els[-1] + driver.execute_script('window.scrollTo(0, document.body.scrollHeight);') + time.sleep(random.uniform(0.5, 1.5)) + show_more_button.click() + time.sleep(random.uniform(0.5, 1.5)) + show_more_tries += 1 + if show_more_tries >= show_more_max: + raise NoSuchElementException + except NoSuchElementException: + print("Loaded all tweets.") + break + except Exception as e: + close_tab(driver) + return [] + + + cards = driver.find_elements(by=By.XPATH, value=tweets_xpath) + if len(cards) == 0: + return [] + print(f"Found {len(cards)} tweets.") + infos = [] + for card in cards: + info = parse_card(card, driver) + infos.append(info) + root_url = infos[0]['url'] + infos[0]['root_url'] = root_url + infos[0]['thread_url'] = tweet_url + infos[0]['prev_url'] = None + for i, info in enumerate(infos[1:]): + info['root_url'] = root_url + info['thread_url'] = tweet_url + info['prev_url'] = infos[i-1]['url'] + + close_tab(driver) + return [info for info in infos if info['timestamp'] is not None] + + +def parse_card(card, driver): + image_links = [] + + info = {} + ### This is a hack, but the thread tweet doesn't have a timestamp, + ### so skip because we've already accounted for it. + try: + info['timestamp'] = card.find_element(by=By.XPATH, value='.//time').get_attribute('datetime') + except: + info['timestamp'] = None + + try: + info['username'] = card.find_element(by=By.XPATH, value='.//span').text + except: + info['username'] = None + + try: + info['handle'] = card.find_element(by=By.XPATH, value='.//span[contains(text(), "@")]').text + except: + info['handle'] = None + + + try: + info['text'] = card.find_element(by=By.XPATH, value='.//div[@data-testid="tweetText"]').text + except: + info['text'] = None + + try: + info['embedded_text'] = card.find_element(by=By.XPATH, value='.//div[2]/div[2]/div[2]').text + except: + info['embedded_text'] = None + + # text = comment + embedded + + try: + info['replies_str'] = card.find_element(by=By.XPATH, value='.//div[@data-testid="reply"]').text + except: + info['replies_str'] = '0' + + try: + info['retweets_str'] = card.find_element(by=By.XPATH, value='.//div[@data-testid="retweet"]').text + except: + info['retweets_str'] = '0' + + try: + info['likes_str'] = card.find_element(by=By.XPATH, value='.//div[@data-testid="like"]').text + except: + info['likes_str'] = '0' + + try: + elements = card.find_elements(by=By.XPATH, value='.//div[2]/div[2]//img[contains(@src, "https://pbs.twimg.com/")]') + for element in elements: + image_links.append(element.get_attribute('src')) + except: + image_links = [] + info['image_links'] = image_links + + # if save_images == True: + # for image_url in image_links: + # save_image(image_url, image_url, save_dir) + # handle promoted tweets + + try: + promoted = card.find_element(by=By.XPATH, value='.//div[2]/div[2]/[last()]//span').text == "Promoted" + except: + promoted = False + if promoted: + info['promoted'] = promoted + + # get a string of all emojis contained in the tweet + try: + emoji_tags = card.find_elements(by=By.XPATH, value='.//img[contains(@src, "emoji")]') + except: + emoji_tags = [] + emoji_list = [] + for tag in emoji_tags: + try: + filename = tag.get_attribute('src') + emoji = chr(int(re.search(r'svg\/([a-z0-9]+)\.svg', filename).group(1), base=16)) + except AttributeError: + continue + if emoji: + emoji_list.append(emoji) + emojis = ' '.join(emoji_list) + info['emojis'] = emojis + + # tweet url + try: + element = card.find_element(by=By.XPATH, value='.//a[contains(@href, "/status/")]') + info['url'] = element.get_attribute('href') + except: + info['url'] = None + + agent_xpath = '//a[contains(@href, "help.twitter.com/using-twitter/how-to-tweet")]//span' + try: + agent_el = WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.XPATH, agent_xpath)) + ) + info['agent'] = agent_el.text + except TimeoutException as te: + print("Timeout!") + print(te) + info['agent'] = None + except Exception as e: + print("Encountered exception!") + print(e) + info['agent'] = None + return info + diff --git a/Scweet/scweet.py b/Scweet/scweet.py index d541c79..731b869 100644 --- a/Scweet/scweet.py +++ b/Scweet/scweet.py @@ -6,14 +6,14 @@ import random import pandas as pd -from .utils import init_driver, get_last_date_from_csv, log_search_page, keep_scroling, dowload_images +from .utils import init_driver, get_last_date_from_csv, log_search_page, keep_scroling, download_images def scrape(since, until=None, words=None, to_account=None, from_account=None, mention_account=None, interval=5, lang=None, headless=True, limit=float("inf"), display_type="Top", resume=False, proxy=None, hashtag=None, show_images=False, save_images=False, save_dir="outputs", filter_replies=False, proximity=False, - geocode=None, minreplies=None, minlikes=None, minretweets=None): + geocode=None, minreplies=None, minlikes=None, minretweets=None, get_agent=False): """ scrape data from twitter using requests, starting from until . The program make a search between each and until it reaches the date if it's given, else it stops at the actual date. @@ -25,7 +25,7 @@ def scrape(since, until=None, words=None, to_account=None, from_account=None, me # ------------------------- Variables : # header of csv - header = ['UserScreenName', 'UserName', 'Timestamp', 'Text', 'Embedded_text', 'Emojis', 'Comments', 'Likes', 'Retweets', + header = ['UserScreenName', 'UserName', 'Timestamp', 'Text', 'Embedded_text', 'Emojis', 'Comments', 'Retweets', 'Likes', 'Image link', 'Tweet URL'] # list that contains all data data = [] @@ -65,7 +65,7 @@ def scrape(since, until=None, words=None, to_account=None, from_account=None, me if not os.path.exists(save_dir): os.makedirs(save_dir) # show images during scraping (for saving purpose) - if save_images == True: + if save_images: show_images = True # initiate the driver driver = init_driver(headless, proxy, show_images) @@ -86,9 +86,9 @@ def scrape(since, until=None, words=None, to_account=None, from_account=None, me # number of scrolls scroll = 0 # convert and to str - if type(since) != str : + if not isinstance(since, str): since = datetime.datetime.strftime(since, '%Y-%m-%d') - if type(until_local) != str : + if not isinstance(until_local, str): until_local = datetime.datetime.strftime(until_local, '%Y-%m-%d') # log search page between and path = log_search_page(driver=driver, words=words, since=since, @@ -112,30 +112,37 @@ def scrape(since, until=None, words=None, to_account=None, from_account=None, me # sleep sleep(random.uniform(0.5, 1.5)) # start scrolling and get tweets - driver, data, writer, tweet_ids, scrolling, tweet_parsed, scroll, last_position = \ - keep_scroling(driver, data, writer, tweet_ids, scrolling, tweet_parsed, limit, scroll, last_position) + ( + driver, data, writer, tweet_ids, scrolling, tweet_parsed, scroll, last_position + ) = keep_scroling( + driver, data, writer, tweet_ids, scrolling, tweet_parsed, limit, scroll, + last_position, get_agent=get_agent + ) # keep updating and for every search - if type(since) == str: + if isinstance(since, str): since = datetime.datetime.strptime(since, '%Y-%m-%d') + datetime.timedelta(days=interval) else: since = since + datetime.timedelta(days=interval) - if type(since) != str: + if isinstance(until_local, str): until_local = datetime.datetime.strptime(until_local, '%Y-%m-%d') + datetime.timedelta(days=interval) else: until_local = until_local + datetime.timedelta(days=interval) - data = pd.DataFrame(data, columns = ['UserScreenName', 'UserName', 'Timestamp', 'Text', 'Embedded_text', 'Emojis', - 'Comments', 'Likes', 'Retweets','Image link', 'Tweet URL']) + columns = ['UserScreenName', 'UserName', 'Timestamp', 'Text', 'Embedded_text', 'Emojis', + 'Comments', 'Retweets', 'Likes', 'Image link', 'Tweet URL'] + if get_agent: + columns.append('Agent') + data = pd.DataFrame(data, columns=columns) # save images - if save_images==True: + if save_images: print("Saving images ...") save_images_dir = "images" if not os.path.exists(save_images_dir): os.makedirs(save_images_dir) - dowload_images(data["Image link"], save_images_dir) + download_images(data["Image link"], save_images_dir) # close the web driver driver.close() @@ -206,7 +213,7 @@ def scrape(since, until=None, words=None, to_account=None, from_account=None, me geocode = args.geocode minreplies = args.minreplies minlikes = args.minlikes - minretweets = args.minlikes + minretweets = args.minretweets data = scrape(since=since, until=until, words=words, to_account=to_account, from_account=from_account, mention_account=mention_account, hashtag=hashtag, interval=interval, lang=lang, headless=headless, limit=limit, diff --git a/Scweet/user.py b/Scweet/user.py index 40a5f3b..bd0d1df 100644 --- a/Scweet/user.py +++ b/Scweet/user.py @@ -2,9 +2,10 @@ from time import sleep import random import json +from selenium.webdriver.common.by import By -def get_user_information(users, driver=None, headless=True): +def get_user_information(users, driver=None, headless=True, with_extras: bool=False): """ get user information if the "from_account" argument is specified """ driver = utils.init_driver(headless=headless) @@ -15,79 +16,94 @@ def get_user_information(users, driver=None, headless=True): log_user_page(user, driver) - if user is not None: - - try: - following = driver.find_element_by_xpath( - '//a[contains(@href,"/following")]/span[1]/span[1]').text - followers = driver.find_element_by_xpath( - '//a[contains(@href,"/followers")]/span[1]/span[1]').text - except Exception as e: - # print(e) - return - - try: - element = driver.find_element_by_xpath('//div[contains(@data-testid,"UserProfileHeader_Items")]//a[1]') - website = element.get_attribute("href") - except Exception as e: - # print(e) - website = "" - - try: - desc = driver.find_element_by_xpath('//div[contains(@data-testid,"UserDescription")]').text - except Exception as e: - # print(e) - desc = "" - a = 0 - try: - join_date = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[3]').text - birthday = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[2]').text - location = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[1]').text - except Exception as e: - # print(e) - try: - join_date = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[2]').text - span1 = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[1]').text - if hasNumbers(span1): - birthday = span1 - location = "" - else: - location = span1 - birthday = "" - except Exception as e: - # print(e) - try: - join_date = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[1]').text - birthday = "" - location = "" - except Exception as e: - # print(e) - join_date = "" - birthday = "" - location = "" - print("--------------- " + user + " information : ---------------") - print("Following : ", following) - print("Followers : ", followers) - print("Location : ", location) - print("Join date : ", join_date) - print("Birth date : ", birthday) - print("Description : ", desc) - print("Website : ", website) - users_info[user] = [following, followers, join_date, birthday, location, website, desc] - - if i == len(users) - 1: - driver.close() - return users_info - else: - print("You must specify the user") + if user is None: + print('You must specify a user.') continue + try: + following = driver.find_element_by_xpath( + '//a[contains(@href,"/following")]/span[1]/span[1]').text + followers = driver.find_element_by_xpath( + '//a[contains(@href,"/followers")]/span[1]/span[1]').text + except Exception as e: + following, followers = '', '' + + try: + website_el = driver.find_element(By.XPATH, value="//a[contains(@data-testid,'UserUrl')]/span") + website = website_el.text + except Exception as e: + website = "" + + try: + desc = driver.find_element_by_xpath('//div[contains(@data-testid,"UserDescription")]').text + except Exception as e: + desc = "" + a = 0 + try: + join_date_el = driver.find_element(By.XPATH, value="//span[contains(@data-testid,'UserJoinDate')]/span[contains(.,'Joined ')]") + join_date = join_date_el.text + except Exception as e: + join_date = "" + try: + birthday_el = driver.find_element(By.XPATH, value="//span[contains(@data-testid,'UserBirthdate') and contains(.,'Born ')]") + birthday = birthday_el.text + except Exception as e: + birthday = "" + try: + location_el = driver.find_element(By.XPATH, value="//span[contains(@data-testid,'UserLocation')]/span/span") + location = location_el.text + except Exception as e: + location = "" + try: + profile_photo_link = driver.find_element(By.XPATH, "//img[contains(@src, 'profile_images')]").get_attribute('src') + except Exception as e: + profile_photo_link = '' + try: + banner_photo_link = driver.find_element(By.XPATH, "//img[contains(@src, 'profile_banners')]").get_attribute('src') + except Exception as e: + banner_photo_link = '' + + + prefixes = { + 'Joined ': 'join_date', + 'Born ': 'birthday', + } + fields = { + 'join_date': join_date, 'birthday': birthday, 'location': location, + 'desc': desc, 'website': website, 'profile_photo_link': profile_photo_link, + 'banner_photo_link': banner_photo_link, + } + swapped_fields = {} + for field, val in fields.items(): + for prefix, true_field in prefixes.items(): + if val.startswith(prefix): + swapped_fields[field] = fields[true_field] + for field, val in swapped_fields.items(): + # old_val = fields[field] + fields[field] = val + + join_date, birthday, location, desc, website = ( + fields['join_date'], fields['birthday'], fields['location'], + fields['desc'], fields['website'], + ) + + + print("--------------- " + user + " information : ---------------") + print("Following : ", following) + print("Followers : ", followers) + print("Location : ", location) + print("Join date : ", join_date) + print("Birth date : ", birthday) + print("Description : ", desc) + print("Website : ", website) + users_info[user] = [following, followers, join_date, birthday, location, website, desc] + if with_extras: + users_info[user] += [profile_photo_link, banner_photo_link] + + if i == len(users) - 1: + driver.close() + return users_info + def log_user_page(user, driver, headless=True): sleep(random.uniform(1, 2)) diff --git a/Scweet/utils.py b/Scweet/utils.py index 3048cee..9ae7e0f 100644 --- a/Scweet/utils.py +++ b/Scweet/utils.py @@ -5,7 +5,7 @@ import random import chromedriver_autoinstaller import geckodriver_autoinstaller -from selenium.common.exceptions import NoSuchElementException +from selenium.common.exceptions import NoSuchElementException, TimeoutException from selenium import webdriver from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.firefox.options import Options as FirefoxOptions @@ -20,13 +20,14 @@ from selenium.webdriver.common.by import By from . import const import urllib +from typing import Union from .const import get_username, get_password, get_email # current_dir = pathlib.Path(__file__).parent.absolute() -def get_data(card, save_images=False, save_dir=None): +def get_data(card, save_images=False, save_dir=None, driver=None, get_agent=False): """Extract data from tweet card""" image_links = [] @@ -35,10 +36,6 @@ def get_data(card, save_images=False, save_dir=None): except: return - try: - handle = card.find_element(by=By.XPATH, value='.//span[contains(text(), "@")]').text - except: - return try: postdate = card.find_element(by=By.XPATH, value='.//time').get_attribute('datetime') @@ -114,10 +111,66 @@ def get_data(card, save_images=False, save_dir=None): except: return - tweet = ( - username, handle, postdate, text, embedded, emojis, reply_cnt, retweet_cnt, like_cnt, image_links, tweet_url) - return tweet + handle = '@' + tweet_url.split('twitter.com/')[1].split('/status/')[0] + + agent = None + if get_agent and driver is not None: + agent = get_agent_str(driver, tweet_url) + + tweet = [ + username, handle, postdate, text, embedded, emojis, + reply_cnt, retweet_cnt, like_cnt, image_links, tweet_url + ] + if agent is not None: + tweet.append(agent) + return tuple(tweet) + + +def get_agent_str(driver, tweet_url: str) -> str: + """ + Get the agent string (e.g. "Twitter for Android"). + Returns an empty string if the agent can't be extracted. + """ + driver.execute_script('window.open("");') + driver.switch_to.window(driver.window_handles[1]) + driver.set_page_load_timeout(5) + try: + driver.get(tweet_url) + except TimeoutException as te: + print("Failed to get tweet") + print(te) + if len(driver.window_handles) > 1: + driver.close() + return '' + + agent_xpath = '//a[contains(@href, "help.twitter.com/using-twitter/how-to-tweet")]//span' + try: + agent_el = WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.XPATH, agent_xpath)) + ) + agent = agent_el.text + except TimeoutException as te: + print("Timeout!") + print(te) + agent = '' + except Exception as e: + print("Encountered exception!") + print(e) + agent = '' + finally: + # driver.find_element_by_tag_name('body').send_keys(Keys.COMMAND + 'w') + try: + if len(driver.window_handles) > 1: + driver.close() + except Exception as e: + print("Cannot close tab!") + try: + driver.switch_to.window(driver.window_handles[0]) + except Exception as e: + print("Cannot change focus!") + # sleep(random.uniform(1.5, 3.5)) + return agent def init_driver(headless=True, proxy=None, show_images=False, option=None, firefox=False, env=None): """ initiate a chromedriver or firefoxdriver instance @@ -268,7 +321,7 @@ def log_in(driver, env, timeout=20, wait=4): def keep_scroling(driver, data, writer, tweet_ids, scrolling, tweet_parsed, limit, scroll, last_position, - save_images=False): + save_images=False, get_agent=False): """ scrolling function for tweets crawling""" save_images_dir = "/images" @@ -282,10 +335,10 @@ def keep_scroling(driver, data, writer, tweet_ids, scrolling, tweet_parsed, limi # get the card of tweets page_cards = driver.find_elements(by=By.XPATH, value='//article[@data-testid="tweet"]') # changed div by article for card in page_cards: - tweet = get_data(card, save_images, save_images_dir) + tweet = get_data(card, save_images, save_images_dir, driver=driver, get_agent=get_agent) if tweet: # check if the tweet is unique - tweet_id = ''.join(tweet[:-2]) + tweet_id = ''.join(tweet[:-3]) if tweet_id not in tweet_ids: tweet_ids.add(tweet_id) data.append(tweet) @@ -418,7 +471,14 @@ def check_exists_by_xpath(xpath, driver): return True -def dowload_images(urls, save_dir): +def download_images(urls, save_dir): for i, url_v in enumerate(urls): for j, url in enumerate(url_v): urllib.request.urlretrieve(url, save_dir + '/' + str(i + 1) + '_' + str(j + 1) + ".jpg") + + +def dowload_images(*args): + """ + Keep the old misspelled version in case someone relies upon it. + """ + return download_images(*args) diff --git a/build/lib/Scweet/__init__.py b/build/lib/Scweet/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/build/lib/Scweet/__version__.py b/build/lib/Scweet/__version__.py deleted file mode 100644 index fd3808e..0000000 --- a/build/lib/Scweet/__version__.py +++ /dev/null @@ -1,3 +0,0 @@ -VERSION = (1, 8) - -__version__ = '.'.join(map(str, VERSION)) \ No newline at end of file diff --git a/build/lib/Scweet/const.py b/build/lib/Scweet/const.py deleted file mode 100644 index 2efe053..0000000 --- a/build/lib/Scweet/const.py +++ /dev/null @@ -1,31 +0,0 @@ -import dotenv -import os -from pathlib import Path - -current_dir = Path(__file__).parent.absolute() - - -# env_file = os.getenv("SCWEET_ENV_FILE", current_dir.parent.joinpath(".env")) -# dotenv.load_dotenv(env_file, verbose=True) - - -def load_env_variable(key, default_value=None, none_allowed=False): - v = os.getenv(key, default=default_value) - if v is None and not none_allowed: - raise RuntimeError(f"{key} returned {v} but this is not allowed!") - return v - - -def get_email(env): - dotenv.load_dotenv(env, verbose=True) - return load_env_variable("SCWEET_EMAIL", none_allowed=True) - - -def get_password(env): - dotenv.load_dotenv(env, verbose=True) - return load_env_variable("SCWEET_PASSWORD", none_allowed=True) - - -def get_username(env): - dotenv.load_dotenv(env, verbose=True) - return load_env_variable("SCWEET_USERNAME", none_allowed=True) diff --git a/build/lib/Scweet/scweet.py b/build/lib/Scweet/scweet.py deleted file mode 100644 index d541c79..0000000 --- a/build/lib/Scweet/scweet.py +++ /dev/null @@ -1,214 +0,0 @@ -import csv -import os -import datetime -import argparse -from time import sleep -import random -import pandas as pd - -from .utils import init_driver, get_last_date_from_csv, log_search_page, keep_scroling, dowload_images - - - -def scrape(since, until=None, words=None, to_account=None, from_account=None, mention_account=None, interval=5, lang=None, - headless=True, limit=float("inf"), display_type="Top", resume=False, proxy=None, hashtag=None, - show_images=False, save_images=False, save_dir="outputs", filter_replies=False, proximity=False, - geocode=None, minreplies=None, minlikes=None, minretweets=None): - """ - scrape data from twitter using requests, starting from until . The program make a search between each and - until it reaches the date if it's given, else it stops at the actual date. - - return: - data : df containing all tweets scraped with the associated features. - save a csv file containing all tweets scraped with the associated features. - """ - - # ------------------------- Variables : - # header of csv - header = ['UserScreenName', 'UserName', 'Timestamp', 'Text', 'Embedded_text', 'Emojis', 'Comments', 'Likes', 'Retweets', - 'Image link', 'Tweet URL'] - # list that contains all data - data = [] - # unique tweet ids - tweet_ids = set() - # write mode - write_mode = 'w' - # start scraping from until - # add the to to get for the first refresh - until_local = datetime.datetime.strptime(since, '%Y-%m-%d') + datetime.timedelta(days=interval) - # if =None, set it to the actual date - if until is None: - until = datetime.date.today().strftime("%Y-%m-%d") - # set refresh at 0. we refresh the page for each of time. - refresh = 0 - - # ------------------------- settings : - # file path - if words: - if type(words) == str : - words = words.split("//") - path = save_dir + "/" + '_'.join(words) + '_' + str(since).split(' ')[0] + '_' + \ - str(until).split(' ')[0] + '.csv' - elif from_account: - path = save_dir + "/" + from_account + '_' + str(since).split(' ')[0] + '_' + str(until).split(' ')[ - 0] + '.csv' - elif to_account: - path = save_dir + "/" + to_account + '_' + str(since).split(' ')[0] + '_' + str(until).split(' ')[ - 0] + '.csv' - elif mention_account: - path = save_dir + "/" + mention_account + '_' + str(init_date).split(' ')[0] + '_' + str(max_date).split(' ')[ - 0] + '.csv' - elif hashtag: - path = save_dir + "/" + hashtag + '_' + str(since).split(' ')[0] + '_' + str(until).split(' ')[ - 0] + '.csv' - # create the - if not os.path.exists(save_dir): - os.makedirs(save_dir) - # show images during scraping (for saving purpose) - if save_images == True: - show_images = True - # initiate the driver - driver = init_driver(headless, proxy, show_images) - # resume scraping from previous work - if resume: - since = str(get_last_date_from_csv(path))[:10] - write_mode = 'a' - - #------------------------- start scraping : keep searching until until - # open the file - with open(path, write_mode, newline='', encoding='utf-8') as f: - writer = csv.writer(f) - if write_mode == 'w': - # write the csv header - writer.writerow(header) - # log search page for a specific of time and keep scrolling unltil scrolling stops or reach the - while until_local <= datetime.datetime.strptime(until, '%Y-%m-%d'): - # number of scrolls - scroll = 0 - # convert and to str - if type(since) != str : - since = datetime.datetime.strftime(since, '%Y-%m-%d') - if type(until_local) != str : - until_local = datetime.datetime.strftime(until_local, '%Y-%m-%d') - # log search page between and - path = log_search_page(driver=driver, words=words, since=since, - until_local=until_local, to_account=to_account, - from_account=from_account, mention_account=mention_account, hashtag=hashtag, lang=lang, - display_type=display_type, filter_replies=filter_replies, proximity=proximity, - geocode=geocode, minreplies=minreplies, minlikes=minlikes, minretweets=minretweets) - # number of logged pages (refresh each ) - refresh += 1 - # number of days crossed - #days_passed = refresh * interval - # last position of the page : the purpose for this is to know if we reached the end of the page or not so - # that we refresh for another and - last_position = driver.execute_script("return window.pageYOffset;") - # should we keep scrolling ? - scrolling = True - print("looking for tweets between " + str(since) + " and " + str(until_local) + " ...") - print(" path : {}".format(path)) - # number of tweets parsed - tweet_parsed = 0 - # sleep - sleep(random.uniform(0.5, 1.5)) - # start scrolling and get tweets - driver, data, writer, tweet_ids, scrolling, tweet_parsed, scroll, last_position = \ - keep_scroling(driver, data, writer, tweet_ids, scrolling, tweet_parsed, limit, scroll, last_position) - - # keep updating and for every search - if type(since) == str: - since = datetime.datetime.strptime(since, '%Y-%m-%d') + datetime.timedelta(days=interval) - else: - since = since + datetime.timedelta(days=interval) - if type(since) != str: - until_local = datetime.datetime.strptime(until_local, '%Y-%m-%d') + datetime.timedelta(days=interval) - else: - until_local = until_local + datetime.timedelta(days=interval) - - data = pd.DataFrame(data, columns = ['UserScreenName', 'UserName', 'Timestamp', 'Text', 'Embedded_text', 'Emojis', - 'Comments', 'Likes', 'Retweets','Image link', 'Tweet URL']) - - # save images - if save_images==True: - print("Saving images ...") - save_images_dir = "images" - if not os.path.exists(save_images_dir): - os.makedirs(save_images_dir) - - dowload_images(data["Image link"], save_images_dir) - - # close the web driver - driver.close() - - return data - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Scrape tweets.') - - parser.add_argument('--words', type=str, - help='Queries. they should be devided by "//" : Cat//Dog.', default=None) - parser.add_argument('--from_account', type=str, - help='Tweets from this account (example : @Tesla).', default=None) - parser.add_argument('--to_account', type=str, - help='Tweets replyed to this account (example : @Tesla).', default=None) - parser.add_argument('--mention_account', type=str, - help='Tweets mention a account (example : @Tesla).', default=None) - parser.add_argument('--hashtag', type=str, - help='Hashtag', default=None) - parser.add_argument('--until', type=str, - help='Max date for search query. example : %%Y-%%m-%%d.', required=True) - parser.add_argument('--since', type=str, - help='Start date for search query. example : %%Y-%%m-%%d.', required=True) - parser.add_argument('--interval', type=int, - help='Interval days between each start date and end date for search queries. example : 5.', - default=1) - parser.add_argument('--lang', type=str, - help='Tweets language. example : "en" for english and "fr" for french.', default=None) - parser.add_argument('--headless', type=bool, - help='Headless webdrives or not. True or False', default=False) - parser.add_argument('--limit', type=int, - help='Limit tweets per ', default=float("inf")) - parser.add_argument('--display_type', type=str, - help='Display type of twitter page : Latest or Top', default="Top") - parser.add_argument('--resume', type=bool, - help='Resume the last scraping. specify the csv file path.', default=False) - parser.add_argument('--proxy', type=str, - help='Proxy server', default=None) - parser.add_argument('--proximity', type=bool, - help='Proximity', default=False) - parser.add_argument('--geocode', type=str, - help='Geographical location coordinates to center the search, radius. No compatible with proximity', default=None) - parser.add_argument('--minreplies', type=int, - help='Min. number of replies to the tweet', default=None) - parser.add_argument('--minlikes', type=int, - help='Min. number of likes to the tweet', default=None) - parser.add_argument('--minretweets', type=int, - help='Min. number of retweets to the tweet', default=None) - - - args = parser.parse_args() - - words = args.words - until = args.until - since = args.since - interval = args.interval - lang = args.lang - headless = args.headless - limit = args.limit - display_type = args.display_type - from_account = args.from_account - to_account = args.to_account - mention_account = args.mention_account - hashtag = args.hashtag - resume = args.resume - proxy = args.proxy - proximity = args.proximity - geocode = args.geocode - minreplies = args.minreplies - minlikes = args.minlikes - minretweets = args.minlikes - - data = scrape(since=since, until=until, words=words, to_account=to_account, from_account=from_account, mention_account=mention_account, - hashtag=hashtag, interval=interval, lang=lang, headless=headless, limit=limit, - display_type=display_type, resume=resume, proxy=proxy, filter_replies=False, proximity=proximity, - geocode=geocode, minreplies=minreplies, minlikes=minlikes, minretweets=minretweets) diff --git a/build/lib/Scweet/user.py b/build/lib/Scweet/user.py deleted file mode 100644 index 40a5f3b..0000000 --- a/build/lib/Scweet/user.py +++ /dev/null @@ -1,125 +0,0 @@ -from . import utils -from time import sleep -import random -import json - - -def get_user_information(users, driver=None, headless=True): - """ get user information if the "from_account" argument is specified """ - - driver = utils.init_driver(headless=headless) - - users_info = {} - - for i, user in enumerate(users): - - log_user_page(user, driver) - - if user is not None: - - try: - following = driver.find_element_by_xpath( - '//a[contains(@href,"/following")]/span[1]/span[1]').text - followers = driver.find_element_by_xpath( - '//a[contains(@href,"/followers")]/span[1]/span[1]').text - except Exception as e: - # print(e) - return - - try: - element = driver.find_element_by_xpath('//div[contains(@data-testid,"UserProfileHeader_Items")]//a[1]') - website = element.get_attribute("href") - except Exception as e: - # print(e) - website = "" - - try: - desc = driver.find_element_by_xpath('//div[contains(@data-testid,"UserDescription")]').text - except Exception as e: - # print(e) - desc = "" - a = 0 - try: - join_date = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[3]').text - birthday = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[2]').text - location = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[1]').text - except Exception as e: - # print(e) - try: - join_date = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[2]').text - span1 = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[1]').text - if hasNumbers(span1): - birthday = span1 - location = "" - else: - location = span1 - birthday = "" - except Exception as e: - # print(e) - try: - join_date = driver.find_element_by_xpath( - '//div[contains(@data-testid,"UserProfileHeader_Items")]/span[1]').text - birthday = "" - location = "" - except Exception as e: - # print(e) - join_date = "" - birthday = "" - location = "" - print("--------------- " + user + " information : ---------------") - print("Following : ", following) - print("Followers : ", followers) - print("Location : ", location) - print("Join date : ", join_date) - print("Birth date : ", birthday) - print("Description : ", desc) - print("Website : ", website) - users_info[user] = [following, followers, join_date, birthday, location, website, desc] - - if i == len(users) - 1: - driver.close() - return users_info - else: - print("You must specify the user") - continue - - -def log_user_page(user, driver, headless=True): - sleep(random.uniform(1, 2)) - driver.get('https://twitter.com/' + user) - sleep(random.uniform(1, 2)) - - -def get_users_followers(users, env, verbose=1, headless=True, wait=2, limit=float('inf'), file_path=None): - followers = utils.get_users_follow(users, headless, env, "followers", verbose, wait=wait, limit=limit) - - if file_path == None: - file_path = 'outputs/' + str(users[0]) + '_' + str(users[-1]) + '_' + 'followers.json' - else: - file_path = file_path + str(users[0]) + '_' + str(users[-1]) + '_' + 'followers.json' - with open(file_path, 'w') as f: - json.dump(followers, f) - print(f"file saved in {file_path}") - return followers - - -def get_users_following(users, env, verbose=1, headless=True, wait=2, limit=float('inf'), file_path=None): - following = utils.get_users_follow(users, headless, env, "following", verbose, wait=wait, limit=limit) - - if file_path == None: - file_path = 'outputs/' + str(users[0]) + '_' + str(users[-1]) + '_' + 'following.json' - else: - file_path = file_path + str(users[0]) + '_' + str(users[-1]) + '_' + 'following.json' - with open(file_path, 'w') as f: - json.dump(following, f) - print(f"file saved in {file_path}") - return following - - -def hasNumbers(inputString): - return any(char.isdigit() for char in inputString) diff --git a/build/lib/Scweet/utils.py b/build/lib/Scweet/utils.py deleted file mode 100644 index ef3cbf8..0000000 --- a/build/lib/Scweet/utils.py +++ /dev/null @@ -1,414 +0,0 @@ -from io import StringIO, BytesIO -import os -import re -from time import sleep -import random -import chromedriver_autoinstaller -from selenium.common.exceptions import NoSuchElementException -from selenium import webdriver -from selenium.webdriver.chrome.options import Options -import datetime -import pandas as pd -import platform -from selenium.webdriver.common.keys import Keys -# import pathlib - -from selenium.webdriver.support.wait import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.common.by import By -from . import const -import urllib - -from .const import get_username, get_password, get_email - - -# current_dir = pathlib.Path(__file__).parent.absolute() - -def get_data(card, save_images=False, save_dir=None): - """Extract data from tweet card""" - image_links = [] - - try: - username = card.find_element_by_xpath('.//span').text - except: - return - - try: - handle = card.find_element_by_xpath('.//span[contains(text(), "@")]').text - except: - return - - try: - postdate = card.find_element_by_xpath('.//time').get_attribute('datetime') - except: - return - - try: - text = card.find_element_by_xpath('.//div[2]/div[2]/div[1]').text - except: - text = "" - - try: - embedded = card.find_element_by_xpath('.//div[2]/div[2]/div[2]').text - except: - embedded = "" - - # text = comment + embedded - - try: - reply_cnt = card.find_element_by_xpath('.//div[@data-testid="reply"]').text - except: - reply_cnt = 0 - - try: - retweet_cnt = card.find_element_by_xpath('.//div[@data-testid="retweet"]').text - except: - retweet_cnt = 0 - - try: - like_cnt = card.find_element_by_xpath('.//div[@data-testid="like"]').text - except: - like_cnt = 0 - - try: - elements = card.find_elements_by_xpath('.//div[2]/div[2]//img[contains(@src, "https://pbs.twimg.com/")]') - for element in elements: - image_links.append(element.get_attribute('src')) - except: - image_links = [] - - # if save_images == True: - # for image_url in image_links: - # save_image(image_url, image_url, save_dir) - # handle promoted tweets - - try: - promoted = card.find_element_by_xpath('.//div[2]/div[2]/[last()]//span').text == "Promoted" - except: - promoted = False - if promoted: - return - - # get a string of all emojis contained in the tweet - try: - emoji_tags = card.find_elements_by_xpath('.//img[contains(@src, "emoji")]') - except: - return - emoji_list = [] - for tag in emoji_tags: - try: - filename = tag.get_attribute('src') - emoji = chr(int(re.search(r'svg\/([a-z0-9]+)\.svg', filename).group(1), base=16)) - except AttributeError: - continue - if emoji: - emoji_list.append(emoji) - emojis = ' '.join(emoji_list) - - # tweet url - try: - element = card.find_element_by_xpath('.//a[contains(@href, "/status/")]') - tweet_url = element.get_attribute('href') - except: - return - - tweet = ( - username, handle, postdate, text, embedded, emojis, reply_cnt, retweet_cnt, like_cnt, image_links, tweet_url) - return tweet - - -def init_driver(headless=True, proxy=None, show_images=False, option=None): - """ initiate a chromedriver instance - --option : other option to add (str) - """ - - # create instance of web driver - chromedriver_path = chromedriver_autoinstaller.install() - # options - options = Options() - if headless is True: - print("Scraping on headless mode.") - options.add_argument('--disable-gpu') - options.headless = True - else: - options.headless = False - options.add_argument('log-level=3') - if proxy is not None: - options.add_argument('--proxy-server=%s' % proxy) - print("using proxy : ", proxy) - if show_images == False: - prefs = {"profile.managed_default_content_settings.images": 2} - options.add_experimental_option("prefs", prefs) - if option is not None: - options.add_argument(option) - driver = webdriver.Chrome(options=options, executable_path=chromedriver_path) - driver.set_page_load_timeout(100) - return driver - - -def log_search_page(driver, since, until_local, lang, display_type, words, to_account, from_account, mention_account, - hashtag, filter_replies, proximity, - geocode, minreplies, minlikes, minretweets): - """ Search for this query between since and until_local""" - # format the , and - from_account = "(from%3A" + from_account + ")%20" if from_account is not None else "" - to_account = "(to%3A" + to_account + ")%20" if to_account is not None else "" - mention_account = "(%40" + mention_account + ")%20" if mention_account is not None else "" - hash_tags = "(%23" + hashtag + ")%20" if hashtag is not None else "" - - if words is not None: - if len(words) == 1: - words = "(" + str(''.join(words)) + ")%20" - else: - words = "(" + str('%20OR%20'.join(words)) + ")%20" - else: - words = "" - - if lang is not None: - lang = 'lang%3A' + lang - else: - lang = "" - - until_local = "until%3A" + until_local + "%20" - since = "since%3A" + since + "%20" - - if display_type == "Latest" or display_type == "latest": - display_type = "&f=live" - elif display_type == "Image" or display_type == "image": - display_type = "&f=image" - else: - display_type = "" - - # filter replies - if filter_replies == True: - filter_replies = "%20-filter%3Areplies" - else: - filter_replies = "" - # geo - if geocode is not None: - geocode = "%20geocode%3A" + geocode - else: - geocode = "" - # min number of replies - if minreplies is not None: - minreplies = "%20min_replies%3A" + str(minreplies) - else: - minreplies = "" - # min number of likes - if minlikes is not None: - minlikes = "%20min_faves%3A" + str(minlikes) - else: - minlikes = "" - # min number of retweets - if minretweets is not None: - minretweets = "%20min_retweets%3A" + str(minretweets) - else: - minretweets = "" - - # proximity - if proximity == True: - proximity = "&lf=on" # at the end - else: - proximity = "" - - path = 'https://twitter.com/search?q=' + words + from_account + to_account + mention_account + hash_tags + until_local + since + lang + filter_replies + geocode + minreplies + minlikes + minretweets + '&src=typed_query' + display_type + proximity - driver.get(path) - return path - - -def get_last_date_from_csv(path): - df = pd.read_csv(path) - return datetime.datetime.strftime(max(pd.to_datetime(df["Timestamp"])), '%Y-%m-%dT%H:%M:%S.000Z') - - -def log_in(driver, env, timeout=20, wait=4): - email = get_email(env) # const.EMAIL - password = get_password(env) # const.PASSWORD - username = get_username(env) # const.USERNAME - - driver.get('https://twitter.com/i/flow/login') - - email_xpath = '//input[@autocomplete="username"]' - password_xpath = '//input[@autocomplete="current-password"]' - username_xpath = '//input[@data-testid="ocfEnterTextTextInput"]' - - sleep(random.uniform(wait, wait + 1)) - - # enter email - email_el = driver.find_element_by_xpath(email_xpath) - sleep(random.uniform(wait, wait + 1)) - email_el.send_keys(email) - sleep(random.uniform(wait, wait + 1)) - email_el.send_keys(Keys.RETURN) - sleep(random.uniform(wait, wait + 1)) - # in case twitter spotted unusual login activity : enter your username - if check_exists_by_xpath(username_xpath, driver): - username_el = driver.find_element_by_xpath(username_xpath) - sleep(random.uniform(wait, wait + 1)) - username_el.send_keys(username) - sleep(random.uniform(wait, wait + 1)) - username_el.send_keys(Keys.RETURN) - sleep(random.uniform(wait, wait + 1)) - # enter password - password_el = driver.find_element_by_xpath(password_xpath) - password_el.send_keys(password) - sleep(random.uniform(wait, wait + 1)) - password_el.send_keys(Keys.RETURN) - sleep(random.uniform(wait, wait + 1)) - - -def keep_scroling(driver, data, writer, tweet_ids, scrolling, tweet_parsed, limit, scroll, last_position, - save_images=False): - """ scrolling function for tweets crawling""" - - save_images_dir = "/images" - - if save_images == True: - if not os.path.exists(save_images_dir): - os.mkdir(save_images_dir) - - while scrolling and tweet_parsed < limit: - sleep(random.uniform(0.5, 1.5)) - # get the card of tweets - page_cards = driver.find_elements_by_xpath('//article[@data-testid="tweet"]') # changed div by article - for card in page_cards: - tweet = get_data(card, save_images, save_images_dir) - if tweet: - # check if the tweet is unique - tweet_id = ''.join(tweet[:-2]) - if tweet_id not in tweet_ids: - tweet_ids.add(tweet_id) - data.append(tweet) - last_date = str(tweet[2]) - print("Tweet made at: " + str(last_date) + " is found.") - writer.writerow(tweet) - tweet_parsed += 1 - if tweet_parsed >= limit: - break - scroll_attempt = 0 - while tweet_parsed < limit: - # check scroll position - scroll += 1 - print("scroll ", scroll) - sleep(random.uniform(0.5, 1.5)) - driver.execute_script('window.scrollTo(0, document.body.scrollHeight);') - curr_position = driver.execute_script("return window.pageYOffset;") - if last_position == curr_position: - scroll_attempt += 1 - # end of scroll region - if scroll_attempt >= 2: - scrolling = False - break - else: - sleep(random.uniform(0.5, 1.5)) # attempt another scroll - else: - last_position = curr_position - break - return driver, data, writer, tweet_ids, scrolling, tweet_parsed, scroll, last_position - - -def get_users_follow(users, headless, env, follow=None, verbose=1, wait=2, limit=float('inf')): - """ get the following or followers of a list of users """ - - # initiate the driver - driver = init_driver(headless=headless) - sleep(wait) - # log in (the .env file should contain the username and password) - # driver.get('https://www.twitter.com/login') - log_in(driver, env, wait=wait) - sleep(wait) - # followers and following dict of each user - follows_users = {} - - for user in users: - # if the login fails, find the new log in button and log in again. - if check_exists_by_link_text("Log in", driver): - print("Login failed. Retry...") - login = driver.find_element_by_link_text("Log in") - sleep(random.uniform(wait - 0.5, wait + 0.5)) - driver.execute_script("arguments[0].click();", login) - sleep(random.uniform(wait - 0.5, wait + 0.5)) - sleep(wait) - log_in(driver, env) - sleep(wait) - # case 2 - if check_exists_by_xpath('//input[@name="session[username_or_email]"]', driver): - print("Login failed. Retry...") - sleep(wait) - log_in(driver, env) - sleep(wait) - print("Crawling " + user + " " + follow) - driver.get('https://twitter.com/' + user + '/' + follow) - sleep(random.uniform(wait - 0.5, wait + 0.5)) - # check if we must keep scrolling - scrolling = True - last_position = driver.execute_script("return window.pageYOffset;") - follows_elem = [] - follow_ids = set() - is_limit = False - while scrolling and not is_limit: - # get the card of following or followers - # this is the primaryColumn attribute that contains both followings and followers - primaryColumn = driver.find_element_by_xpath('//div[contains(@data-testid,"primaryColumn")]') - # extract only the Usercell - page_cards = primaryColumn.find_elements_by_xpath('//div[contains(@data-testid,"UserCell")]') - for card in page_cards: - # get the following or followers element - element = card.find_element_by_xpath('.//div[1]/div[1]/div[1]//a[1]') - follow_elem = element.get_attribute('href') - # append to the list - follow_id = str(follow_elem) - follow_elem = '@' + str(follow_elem).split('/')[-1] - if follow_id not in follow_ids: - follow_ids.add(follow_id) - follows_elem.append(follow_elem) - if len(follows_elem) >= limit: - is_limit = True - break - if verbose: - print(follow_elem) - print("Found " + str(len(follows_elem)) + " " + follow) - scroll_attempt = 0 - while not is_limit: - sleep(random.uniform(wait - 0.5, wait + 0.5)) - driver.execute_script('window.scrollTo(0, document.body.scrollHeight);') - sleep(random.uniform(wait - 0.5, wait + 0.5)) - curr_position = driver.execute_script("return window.pageYOffset;") - if last_position == curr_position: - scroll_attempt += 1 - # end of scroll region - if scroll_attempt >= 2: - scrolling = False - break - else: - sleep(random.uniform(wait - 0.5, wait + 0.5)) # attempt another scroll - else: - last_position = curr_position - break - - follows_users[user] = follows_elem - - return follows_users - - -def check_exists_by_link_text(text, driver): - try: - driver.find_element_by_link_text(text) - except NoSuchElementException: - return False - return True - - -def check_exists_by_xpath(xpath, driver): - timeout = 3 - try: - driver.find_element_by_xpath(xpath) - except NoSuchElementException: - return False - return True - - -def dowload_images(urls, save_dir): - for i, url_v in enumerate(urls): - for j, url in enumerate(url_v): - urllib.request.urlretrieve(url, save_dir + '/' + str(i + 1) + '_' + str(j + 1) + ".jpg")