diff --git a/stackoversight/scraping/site.py b/stackoversight/scraping/abstractsite.py similarity index 67% rename from stackoversight/scraping/site.py rename to stackoversight/scraping/abstractsite.py index 094f070..77380c9 100644 --- a/stackoversight/scraping/site.py +++ b/stackoversight/scraping/abstractsite.py @@ -1,27 +1,23 @@ -# For the proxy error and the cook_soup type specification -import requests -# For nap time if we're nice to the process and the site -from time import sleep -# For site request limit management +import logging +import threading import time -# To parse the HTML documents +from time import sleep + +import requests from bs4 import BeautifulSoup -# For balancing client requests to the site from stackoversight.scraping.site_balancer import SiteBalancer -class Site(object): - balancer = None - last_pause_time = None +class AbstractSite(object): def __init__(self, sessions: list, timeout_sec: int, limit: int): self.limit = limit self.timeout_sec = timeout_sec - self.back_off = 0 + self.pause_lock = threading.Lock() + self.last_pause_time = None - if not self.balancer: - balancer = SiteBalancer(sessions, timeout_sec, limit) + self.balancer = SiteBalancer(sessions, timeout_sec, limit) def pause(self, pause_time): if not pause_time and self.limit: @@ -33,13 +29,6 @@ def pause(self, pause_time): if pause_time < min_pause: pause_time = min_pause - # can not wait less than the back off field if it is set - if pause_time < self.back_off: - pause_time = self.back_off - - # returns the field to zero as it should be set only each time it is returned by the api - self.back_off = 0 - # only wait the diff between the time already elapsed from the last request and the pause_time if self.last_pause_time: time_elapsed = time.time() - self.last_pause_time @@ -50,30 +39,26 @@ def pause(self, pause_time): else: pause_time = 0 - # initialize the last_pause_time field and sleep - sleep(pause_time) - self.last_pause_time = time.time() - - return self.last_pause_time + with self.pause_lock: + # can not wait less than the back off field if it is set + # returns the field to zero as it should be set only each time it is returned by the api + # TODO: eventually this back_off field could be tied to the method called, and only threads using that + # method must wait the extra + back_off = self.clear_back_off() + if pause_time < back_off: + pause_time = back_off - def create_parent_link(self, *args): - raise NotImplementedError + logging.info(f'back_off in {threading.current_thread().getName()} is being handled') - def get_child_links(self, *args): - raise NotImplementedError - - def handle_request(self, url, session): - raise NotImplementedError + # initialize the last_pause_time field and sleep + sleep(pause_time) + self.last_pause_time = time.time() - def get_min_pause(self): - raise NotImplementedError + return self.last_pause_time def process_request(self, url: str, pause=False, pause_time=None): - # TODO: Set this up to wait on a signal from a timer thread so that it isn't a busy wait # get the next id to use or wait until one is ready - while not self.balancer.is_ready(): - sleep(1) - print("Waiting...") + self.balancer.ready.wait() key = next(self.balancer) @@ -85,8 +70,8 @@ def process_request(self, url: str, pause=False, pause_time=None): try: response = self.handle_request(url, key) except: - print("Make sure Archituethis is running or comment out setting the proxy environment variables!\n" - "Could also be an issue with your token?") + logging.critical(f'In {threading.current_thread().getName()} error while requesting {url}, ' + f'raising exception.') raise requests.exceptions.ProxyError # mark the request as being made @@ -94,6 +79,21 @@ def process_request(self, url: str, pause=False, pause_time=None): return response, key, request_count + def create_parent_link(self, *args): + raise NotImplementedError + + def get_child_links(self, *args): + raise NotImplementedError + + def handle_request(self, url, session): + raise NotImplementedError + + def get_min_pause(self): + raise NotImplementedError + + def clear_back_off(self): + raise NotImplementedError + @staticmethod def cook_soup(response: requests.Response): return BeautifulSoup(response.text, 'html.parser') diff --git a/stackoversight/scraping/gui.py b/stackoversight/scraping/gui.py index 433c814..a5bffd1 100644 --- a/stackoversight/scraping/gui.py +++ b/stackoversight/scraping/gui.py @@ -1,36 +1,36 @@ -# For gui -from tkinter import * - -# TODO: setup as queue of parent links that this thread will continuously process -root = Tk() - -# TODO: show list of processed and to be processed links in this frame -processing_frame = Frame(root) - -to_be_processed_label = Label(processing_frame, text="Links to be processed") -has_been_processed_label = Label(processing_frame, text="Links that have been processed") - -to_be_processed_label.grid(row=0, column=0) -has_been_processed_label.grid(row=0, column=1) - -processing_frame.pack(side=BOTTOM) - -# TODO: have all the configurable fields able to be set here -link_generation_frame = Frame(root) - -# parent_link_entry = Entry(root) - -fields = [] -for field in site.fields: - var = StringVar() - label = Label(link_generation_frame, text=field.capitalize()) - entry = Entry(link_generation_frame, textvariable=var) - - label.grid(row=len(fields), column=0) - entry.grid(row=len(fields), column=1) - - fields.append((field, var, label, entry)) - -link_generation_frame.pack(side=TOP) - -root.mainloop() +# # For gui +# from tkinter import * +# +# # TODO: setup as queue of parent links that this thread will continuously process +# root = Tk() +# +# # TODO: show list of processed and to be processed links in this frame +# processing_frame = Frame(root) +# +# to_be_processed_label = Label(processing_frame, text="Links to be processed") +# has_been_processed_label = Label(processing_frame, text="Links that have been processed") +# +# to_be_processed_label.grid(row=0, column=0) +# has_been_processed_label.grid(row=0, column=1) +# +# processing_frame.pack(side=BOTTOM) +# +# # TODO: have all the configurable fields able to be set here +# link_generation_frame = Frame(root) +# +# # parent_link_entry = Entry(root) +# +# fields = [] +# for field in site.fields: +# var = StringVar() +# label = Label(link_generation_frame, text=field.capitalize()) +# entry = Entry(link_generation_frame, textvariable=var) +# +# label.grid(row=len(fields), column=0) +# entry.grid(row=len(fields), column=1) +# +# fields.append((field, var, label, entry)) +# +# link_generation_frame.pack(side=TOP) +# +# root.mainloop() diff --git a/stackoversight/scraping/queue_monitor.py b/stackoversight/scraping/queue_monitor.py deleted file mode 100644 index 3dae0d4..0000000 --- a/stackoversight/scraping/queue_monitor.py +++ /dev/null @@ -1,11 +0,0 @@ -from threading import Thread -from queue import Queue - - -class QueueMonitor(Thread): - def __init__(self, queue: Queue): - Thread.__init__(self) - self.queue = queue - - def run(self): - self.queue.join() diff --git a/stackoversight/scraping/read_write_Lock.py b/stackoversight/scraping/read_write_Lock.py new file mode 100644 index 0000000..52134bd --- /dev/null +++ b/stackoversight/scraping/read_write_Lock.py @@ -0,0 +1,273 @@ +import copy +import threading +import time +import unittest + +""" + Credit to original author: Mateusz Kobos + I just added ReaderLock and WriterLock for functionality with with statements +""" + + +class RWLock: + """Synchronization object used in a solution of so-called second + readers-writers problem. In this problem, many readers can simultaneously + access a share, and a writer has an exclusive access to this share. + Additionally, the following constraints should be met: + 1) no reader should be kept waiting if the share is currently opened for + reading unless a writer is also waiting for the share, + 2) no writer should be kept waiting for the share longer than absolutely + necessary. + + The implementation is based on [1, secs. 4.2.2, 4.2.6, 4.2.7] + with a modification -- adding an additional lock (C{self.__readers_queue}) + -- in accordance with [2]. + + Sources: + [1] A.B. Downey: "The little book of semaphores", Version 2.1.5, 2008 + [2] P.J. Courtois, F. Heymans, D.L. Parnas: + "Concurrent Control with 'Readers' and 'Writers'", + Communications of the ACM, 1971 (via [3]) + [3] http://en.wikipedia.org/wiki/Readers-writers_problem + """ + + def __init__(self): + self.__read_switch = _LightSwitch() + self.__write_switch = _LightSwitch() + self.__no_readers = threading.Lock() + self.__no_writers = threading.Lock() + self.__readers_queue = threading.Lock() + """A lock giving an even higher priority to the writer in certain + cases (see [2] for a discussion)""" + + self.read_lock = ReadLock(self) + self.write_lock = WriteLock(self) + + def reader_acquire(self): + self.__readers_queue.acquire() + self.__no_readers.acquire() + self.__read_switch.acquire(self.__no_writers) + self.__no_readers.release() + self.__readers_queue.release() + + def reader_release(self): + self.__read_switch.release(self.__no_writers) + + def writer_acquire(self): + self.__write_switch.acquire(self.__no_readers) + self.__no_writers.acquire() + + def writer_release(self): + self.__no_writers.release() + self.__write_switch.release(self.__no_readers) + + +class _LightSwitch: + """An auxiliary "light switch"-like object. The first thread turns on the + "switch", the last one turns it off (see [1, sec. 4.2.2] for details).""" + + def __init__(self): + self.__counter = 0 + self.__mutex = threading.Lock() + + def acquire(self, lock): + self.__mutex.acquire() + self.__counter += 1 + if self.__counter == 1: + lock.acquire() + self.__mutex.release() + + def release(self, lock): + self.__mutex.acquire() + self.__counter -= 1 + if self.__counter == 0: + lock.release() + self.__mutex.release() + + +class Writer(threading.Thread): + def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time, to_write): + """ + @param buffer_: common buffer_ shared by the readers and writers + @type buffer_: list + @type rw_lock: L{RWLock} + @param init_sleep_time: sleep time before doing any action + @type init_sleep_time: C{float} + @param sleep_time: sleep time while in critical section + @type sleep_time: C{float} + @param to_write: data that will be appended to the buffer + """ + threading.Thread.__init__(self) + self.__buffer = buffer_ + self.__rw_lock = rw_lock + self.__init_sleep_time = init_sleep_time + self.__sleep_time = sleep_time + self.__to_write = to_write + self.entry_time = None + """Time of entry to the critical section""" + self.exit_time = None + """Time of exit from the critical section""" + + def run(self): + time.sleep(self.__init_sleep_time) + self.__rw_lock.writer_acquire() + self.entry_time = time.time() + time.sleep(self.__sleep_time) + self.__buffer.append(self.__to_write) + self.exit_time = time.time() + self.__rw_lock.writer_release() + + +class Reader(threading.Thread): + def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time): + """ + @param buffer_: common buffer shared by the readers and writers + @type buffer_: list + @type rw_lock: L{RWLock} + @param init_sleep_time: sleep time before doing any action + @type init_sleep_time: C{float} + @param sleep_time: sleep time while in critical section + @type sleep_time: C{float} + """ + threading.Thread.__init__(self) + self.__buffer = buffer_ + self.__rw_lock = rw_lock + self.__init_sleep_time = init_sleep_time + self.__sleep_time = sleep_time + self.buffer_read = None + """a copy of a the buffer read while in critical section""" + self.entry_time = None + """Time of entry to the critical section""" + self.exit_time = None + """Time of exit from the critical section""" + + def run(self): + time.sleep(self.__init_sleep_time) + self.__rw_lock.reader_acquire() + self.entry_time = time.time() + time.sleep(self.__sleep_time) + self.buffer_read = copy.deepcopy(self.__buffer) + self.exit_time = time.time() + self.__rw_lock.reader_release() + + +class RWLockTestCase(unittest.TestCase): + def test_readers_nonexclusive_access(self): + (buffer_, rw_lock, threads) = self.__init_variables() + + threads.append(Reader(buffer_, rw_lock, 0, 0)) + threads.append(Writer(buffer_, rw_lock, 0.2, 0.4, 1)) + threads.append(Reader(buffer_, rw_lock, 0.3, 0.3)) + threads.append(Reader(buffer_, rw_lock, 0.5, 0)) + + self.__start_and_join_threads(threads) + + # The third reader should enter after the second one but it should + # exit before the second one exits + # (i.e. the readers should be in the critical section + # at the same time) + + self.assertEqual([], threads[0].buffer_read) + self.assertEqual([1], threads[2].buffer_read) + self.assertEqual([1], threads[3].buffer_read) + self.assert_(threads[1].exit_time <= threads[2].entry_time) + self.assert_(threads[2].entry_time <= threads[3].entry_time) + self.assert_(threads[3].exit_time < threads[2].exit_time) + + def test_writers_exclusive_access(self): + (buffer_, rw_lock, threads) = self.__init_variables() + + threads.append(Writer(buffer_, rw_lock, 0, 0.4, 1)) + threads.append(Writer(buffer_, rw_lock, 0.1, 0, 2)) + threads.append(Reader(buffer_, rw_lock, 0.2, 0)) + + self.__start_and_join_threads(threads) + + # The second writer should wait for the first one to exit + + self.assertEqual([1, 2], threads[2].buffer_read) + self.assert_(threads[0].exit_time <= threads[1].entry_time) + self.assert_(threads[1].exit_time <= threads[2].exit_time) + + def test_writer_priority(self): + (buffer_, rw_lock, threads) = self.__init_variables() + + threads.append(Writer(buffer_, rw_lock, 0, 0, 1)) + threads.append(Reader(buffer_, rw_lock, 0.1, 0.4)) + threads.append(Writer(buffer_, rw_lock, 0.2, 0, 2)) + threads.append(Reader(buffer_, rw_lock, 0.3, 0)) + threads.append(Reader(buffer_, rw_lock, 0.3, 0)) + + self.__start_and_join_threads(threads) + + # The second writer should go before the second and the third reader + + self.assertEqual([1], threads[1].buffer_read) + self.assertEqual([1, 2], threads[3].buffer_read) + self.assertEqual([1, 2], threads[4].buffer_read) + self.assert_(threads[0].exit_time < threads[1].entry_time) + self.assert_(threads[1].exit_time <= threads[2].entry_time) + self.assert_(threads[2].exit_time <= threads[3].entry_time) + self.assert_(threads[2].exit_time <= threads[4].entry_time) + + def test_many_writers_priority(self): + (buffer_, rw_lock, threads) = self.__init_variables() + + threads.append(Writer(buffer_, rw_lock, 0, 0, 1)) + threads.append(Reader(buffer_, rw_lock, 0.1, 0.6)) + threads.append(Writer(buffer_, rw_lock, 0.2, 0.1, 2)) + threads.append(Reader(buffer_, rw_lock, 0.3, 0)) + threads.append(Reader(buffer_, rw_lock, 0.4, 0)) + threads.append(Writer(buffer_, rw_lock, 0.5, 0.1, 3)) + + self.__start_and_join_threads(threads) + + # The two last writers should go first -- after the first reader and + # before the second and the third reader + + self.assertEqual([1], threads[1].buffer_read) + self.assertEqual([1, 2, 3], threads[3].buffer_read) + self.assertEqual([1, 2, 3], threads[4].buffer_read) + self.assert_(threads[0].exit_time < threads[1].entry_time) + self.assert_(threads[1].exit_time <= threads[2].entry_time) + self.assert_(threads[1].exit_time <= threads[5].entry_time) + self.assert_(threads[2].exit_time <= threads[3].entry_time) + self.assert_(threads[2].exit_time <= threads[4].entry_time) + self.assert_(threads[5].exit_time <= threads[3].entry_time) + self.assert_(threads[5].exit_time <= threads[4].entry_time) + + @staticmethod + def __init_variables(): + buffer_ = [] + rw_lock = RWLock() + threads = [] + return buffer_, rw_lock, threads + + @staticmethod + def __start_and_join_threads(threads): + for t in threads: + t.start() + for t in threads: + t.join() + + +class ReadLock: + def __init__(self, read_write_lock: RWLock): + self.read_write_lock = read_write_lock + + def __enter__(self): + self.read_write_lock.reader_acquire() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.read_write_lock.reader_release() + + +class WriteLock: + def __init__(self, read_write_lock: RWLock): + self.read_write_lock = read_write_lock + + def __enter__(self): + self.read_write_lock.writer_acquire() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.read_write_lock.writer_release() diff --git a/stackoversight/scraping/release_heap.py b/stackoversight/scraping/release_heap.py index 8254108..ea86ac2 100644 --- a/stackoversight/scraping/release_heap.py +++ b/stackoversight/scraping/release_heap.py @@ -1,22 +1,34 @@ -# Need a heap ofc import heapq -# For release timers +import logging import threading -# For the release queue from queue import Queue +from stackoversight.scraping.read_write_Lock import RWLock -class ReleaseHeap(object): + +class AbstractReleaseHeap(object): """ elem_list must be a list of mutable tuples! the left value needs to be comparable and incrementable/decrementable """ + def __init__(self, elem_list: list, time_sec: int): # init the heap and queue and assign variables heapq.heapify(elem_list) self.heap = elem_list self.time_sec = time_sec + + self.heap_lock = RWLock() + self.queue_lock = threading.Lock() self.release_queue = Queue() + self.ready = threading.Event() + + # update and make sure the keys are under the limit + self.update_ready() + if not self.ready.is_set(): + logging.critical('The keys used to initialize are already over the request limit!' + 'Can\'t initialize properly, raising exception.') + raise ValueError # quick access to pair by it's value self.hash_map = dict([(pair[1], pair) for pair in self.heap]) @@ -25,16 +37,24 @@ def __iter__(self): return self.heap def __next__(self): - return self.heap[0][1] + with self.heap_lock.read_lock: + ret = self.heap[0][1] + + return ret def capture(self): # pop off the top of the heap and increment the first value of the pair - pair = heapq.heappop(self.heap) - pair[0] += 1 - heapq.heappush(self.heap, pair) + with self.heap_lock.write_lock: + pair = heapq.heappop(self.heap) + pair[0] += 1 + heapq.heappush(self.heap, pair) + + # Update ready event + self.update_ready() # put the value into the queue - self.release_queue.put(pair[1]) + with self.queue_lock: + self.release_queue.put(pair[1]) # start the timer for release threading.Timer(self.time_sec, self.release).start() @@ -43,8 +63,23 @@ def capture(self): def release(self): # get the id of the next to be released and with that the pair - top_value = self.release_queue.get() + with self.queue_lock: + top_value = self.release_queue.get() pair = self.hash_map[top_value] - # beginning value can be non-zero and whatever you choose should be returned to in the end - pair[0] -= 1 + # update heap and ready + with self.heap_lock.write_lock: + # beginning value can be non-zero and whatever you choose should be returned to in the end + pair[0] -= 1 + heapq.heapify(self.heap) + + self.update_ready() + + def update_ready(self): + raise NotImplementedError + + +class ReadyReleaseHeap(AbstractReleaseHeap): + def update_ready(self): + if not self.ready.is_set(): + self.ready.set() diff --git a/stackoversight/scraping/scraper.py b/stackoversight/scraping/scraper.py index a797ac3..1cd7cc6 100644 --- a/stackoversight/scraping/scraper.py +++ b/stackoversight/scraping/scraper.py @@ -1,20 +1,22 @@ -# To set the http_proxy environment variable +import json +import logging import os -# For making the site requests and overall request management -from stackoversight.scraping.stack_overflow import StackOverflow -# For child link queue -from queue import Queue -# For threading the scraping process import threading -# For serialization -import json -# For thread management +import time +from queue import Queue + +from stackoversight.scraping.stack_overflow import StackOverflow from stackoversight.scraping.thread_executioner import ThreadExecutioner class StackOversight(object): - def __init__(self, client_keys: list, proxy=None): + code_lock = threading.Lock() + text_lock = threading.Lock() + + def __init__(self, client_keys: list, kill: threading.Event, proxy=None): if proxy: + logging.info(f'Proxy {proxy} is being used.') + # address of the proxy server self.proxy = 'http://localhost:5050' @@ -29,8 +31,7 @@ def __init__(self, client_keys: list, proxy=None): self.thread_handles = [] self.file_handles = [] - self.code_lock = threading.Lock() - self.text_lock = threading.Lock() + self.kill = kill def start(self, parent_link_queue: Queue, code_file_name='code.txt', text_file_name='text.txt'): code_io_handle = open(code_file_name, 'w') @@ -39,110 +40,123 @@ def start(self, parent_link_queue: Queue, code_file_name='code.txt', text_file_n self.file_handles.extend((code_io_handle, text_io_handle)) child_link_queue = Queue() - kill = threading.Event() parent_link_thread = threading.Thread(target=ThreadExecutioner.execute, - args=(parent_link_queue, self.site, child_link_queue, kill)) + args=(self.scrape_parent_link, parent_link_queue, self.site, + child_link_queue, self.kill)) parent_link_thread.setName("StackExchange API Manager") child_link_thread = threading.Thread(target=ThreadExecutioner.execute, - args=(child_link_queue, self.site, code_io_handle, text_io_handle, kill)) + args=(self.scrape_child_link, child_link_queue, self.site, code_io_handle, + text_io_handle, self.kill)) child_link_thread.setName("StackOverflow Scraping Manager") self.thread_handles.extend((parent_link_thread, child_link_thread)) for handle in self.thread_handles: + logging.info(f'Starting {handle.getName()}.') + handle.start() - kill.wait() + self.kill.wait() for handle in self.thread_handles: was_alive = ThreadExecutioner.murder(handle) - print(f'{handle.getName()} is {["not "] if [not was_alive] else [""]} healthy.') + logging.debug(f'{handle.getName()} was {["not "] if [not was_alive] else [""]} alive.') for file_handle in self.file_handles: file_handle.close() - def scrape_parent_links(self, input_queue: Queue, site: StackOverflow, output_queue: Queue, - failure: threading.Event): - ThreadExecutioner.execute(self.scrape_parent_link, input_queue, site, output_queue, failure) - - def scrape_child_links(self, input_queue: Queue, site: StackOverflow, code_io_handle, text_io_handle, - failure: threading.Event): - ThreadExecutioner.execute(self.scrape_child_link, input_queue, site, code_io_handle, - text_io_handle, failure) + @staticmethod + def scrape_parent_link(link: str, used_parents: Queue, site: StackOverflow, output_queue: Queue, + kill: threading.Event): + current_thread_name = threading.current_thread().getName() + has_more = True + response = None + page = 1 - def scrape_parent_link(self, link: str, used_parents: Queue, site: StackOverflow, output_queue: Queue, - failure: threading.Event): try: - has_more = True while has_more: try: - # TODO: handle None response - # TODO: make sure actually incrementing page - response = site.get_child_links(link, pause=True) + response = site.get_child_links(link + f'&{StackOverflow.fields["page"]}={page}', pause=True) except SystemExit: raise except: - # TODO: logging - failure.set() - raise + logging.critical(f'Unexpected error caught in {current_thread_name} after making' + f'request with {link}.\n{[response] if [response] else ["Response not captured!"]}' + f'\nNow ending process.') + kill.set() + # TODO: handle None response has_more = response[1] response = response[0] list(map(output_queue.put, response)) if not has_more: + logging.info(f'Finished with link {link}, now marking {current_thread_name} for death.') + used_parents.put(threading.currentThread()) break + else: + page += 1 + except SystemExit: - print() - # TODO: logging + logging.info(f'System exit exception raised, {current_thread_name} successfully killed.') def scrape_child_link(self, link: str, used_children: Queue, site: StackOverflow, code_io_handle, text_io_handle, - failure: threading.Event): + kill: threading.Event): + current_thread_name = threading.current_thread().getName() + response = None + try: - # TODO: thread this point on in this method for each link - # TODO: handle None response try: response = site.process_request(link, pause=True)[0] except SystemExit: raise except: - # TODO: logging - failure.set() - raise + logging.critical(f'Unexpected error caught in {current_thread_name} after making' + f'request with {link}.\n{[response] if [response] else ["Response not captured!"]}' + f'\nNow ending process.') + kill.set() + + # TODO: handle None response for code in site.get_code(response): snippet = {'snippet': code} with self.code_lock: json.dump(snippet, code_io_handle) - # code_io_handle.write(code) for text in site.get_text(response): snippet = {'snippet': text} with self.text_lock: json.dump(snippet, text_io_handle) - # text_io_handle.write(text) + logging.info(f'Finished with link {link}, now marking {current_thread_name} for death.') used_children.put(threading.current_thread()) except SystemExit: - print() - # TODO: logging + logging.info(f'System exit exception raised, {current_thread_name} successfully killed.') # for debugging only +logging.basicConfig(filename=f'scraper.{time.strftime("%Y%m%d-%H%M%S")}.log', level=logging.DEBUG) + keys = ['RGaU7lYPN8L5KbnIfkxmGQ((', '1yfsxJa1AC*GlxN6RSemCQ(('] python_posts = StackOverflow.create_parent_link(sort=StackOverflow.Sorts.votes.value, order=StackOverflow.Orders.descending.value, tag=StackOverflow.Tags.python.value, page_size=100) +java_posts = StackOverflow.create_parent_link(sort=StackOverflow.Sorts.votes.value, + order=StackOverflow.Orders.descending.value, + tag=StackOverflow.Tags.java.value, page_size=100) + link_queue = Queue() -link_queue.put(python_posts) +link_queue.put(java_posts) + +_kill = threading.Event() -scraper = StackOversight(keys) +scraper = StackOversight(keys, _kill) scraper.start(link_queue) diff --git a/stackoversight/scraping/site_balancer.py b/stackoversight/scraping/site_balancer.py index a780ded..fd252f1 100644 --- a/stackoversight/scraping/site_balancer.py +++ b/stackoversight/scraping/site_balancer.py @@ -1,20 +1,30 @@ -# Data structure for balancing -from stackoversight.scraping.release_heap import ReleaseHeap +import logging + +from stackoversight.scraping.release_heap import AbstractReleaseHeap # really just a wrapper for ReleaseHeap, designed for client_ids to be iterated through -class SiteBalancer(ReleaseHeap): +class SiteBalancer(AbstractReleaseHeap): + def __init__(self, sessions: list, timeout_sec: int, limit=None): - super().__init__(sessions, timeout_sec) self.limit = limit + super().__init__(sessions, timeout_sec) # capture signal should be sent after using the client_id, so the call is not included here def __next__(self): - if self.limit and self.heap[0][0] >= self.limit: - raise StopIteration + with self.heap_lock.read_lock: + if self.limit and self.heap[0][0] >= self.limit: + logging.warning(f'Limit reached for client key {self.heap[0][1]}') + + raise StopIteration + + client_id = self.heap[0][1] - client_id = self.heap[0][1] return client_id - def is_ready(self): - return not self.limit or self.heap[0][0] < self.limit + def update_ready(self): + with self.heap_lock.read_lock: + if not self.limit or self.heap[0][0] < self.limit: + self.ready.set() + else: + self.ready.clear() diff --git a/stackoversight/scraping/stack_overflow.py b/stackoversight/scraping/stack_overflow.py index e31e747..5d0de29 100644 --- a/stackoversight/scraping/stack_overflow.py +++ b/stackoversight/scraping/stack_overflow.py @@ -1,14 +1,14 @@ -# For basic Site class -from stackoversight.scraping.site import Site -# For site tags and sorts +import logging +import threading from enum import Enum -# For proxy exception -import requests -# Need that mutable tuple my dude + from recordclass.mutabletuple import mutabletuple +import requests +from stackoversight.scraping.abstractsite import AbstractSite + -class StackOverflow(Site): +class StackOverflow(AbstractSite): site = 'stackoverflow' api_url = 'https://api.stackexchange.com' api_version = '2.2' @@ -19,8 +19,12 @@ class StackOverflow(Site): min_pause = 1 / 30 page_size = 100 + req_table_lock = threading.Lock() req_table = set() + back_off_lock = threading.Lock() + back_off = 0 + fields = {'sort': 'sort', 'order': 'order', 'tag': 'tagged', @@ -56,13 +60,14 @@ class Tags(Enum): python2 = 'python-2.7' python3 = 'python-3.x' - def __init__(self, client_keys: list): - sessions = [self.init_key(key) for key in client_keys] + java = 'java' - super(StackOverflow, self).__init__(sessions, self.timeout_sec, self.limit) + def __init__(self, client_keys: list): + super(StackOverflow, self).__init__([self.init_key(key) for key in client_keys], self.timeout_sec, self.limit) def get_child_links(self, parent_link: str, pause=False, pause_time=None): response = self.process_request(parent_link, pause, pause_time) + # TODO: handle None response key = response[1] request_count = response[2] @@ -74,27 +79,35 @@ def get_child_links(self, parent_link: str, pause=False, pause_time=None): links = [item['link'] for item in response['items']] if quota_max - quota_remaining != request_count: - print(f'Request count for key {key} is off by {abs(quota_max - quota_remaining - request_count)}') - # raise ValueError + logging.warning(f'Request count for key {key} is off by {abs(quota_max - quota_remaining - request_count)}') if not links: - print('The proxy is up but it is failing to pull from the site.') - raise requests.exceptions.ProxyError + logging.critical('Failing to pull from the site, raising exception.') + raise requests.exceptions.RequestException if self.fields['back_off'] in response: - self.back_off = response[self.fields['back_off']] + logging.info(f'{threading.current_thread().getName()} received a back_off after processing {parent_link}') + + new_back_off = response[self.fields['back_off']] + + with self.back_off_lock: + if self.back_off and self.back_off < new_back_off: + self.back_off = new_back_off return links, has_more - # as a hook for future needs - def handle_request(self, url: str, key: str): - url = f'{url}&{self.fields["key"]}={key}' + def handle_request(self, link: str, key: str): + link = f'{link}&{self.fields["key"]}={key}' # TODO: have this function return None if it has already been scraped - # if url not in self.req_table: - # self.req_table.add(url) + with self.req_table_lock: + if link not in self.req_table: + self.req_table.add(link) + else: + logging.warning(f'{threading.current_thread().getName()} received a link, {link} that has already been' + f' scraped!') - return requests.get(url) + return requests.get(link) @staticmethod def create_parent_link(method=Methods.question.value, **kwargs): @@ -110,7 +123,9 @@ def create_parent_link(method=Methods.question.value, **kwargs): url_fields += f'{StackOverflow.fields[key]}={kwargs[key]}' - return url + url_fields + url += url_fields + logging.info(f'Parent link {url} created.') + return url @staticmethod def init_key(key: str): @@ -119,26 +134,39 @@ def init_key(key: str): f'{StackOverflow.site}&{StackOverflow.fields["key"]}={key}').json() if response['quota_max'] != StackOverflow.limit: + # TODO: later on handle by updating the limit instead + logging.critical('Limit does not match that returned by the site, raising exception.') raise ValueError - return mutabletuple(StackOverflow.limit - response['quota_remaining'], key) + return mutabletuple(StackOverflow.limit - response['quota_remaining'] + 1, key) @staticmethod def get_text(response: requests.Response): try: - return [element.get_text() for element in Site.cook_soup(response).find_all(attrs={'class': 'post-text'})] + return [element.get_text() for element in + AbstractSite.cook_soup(response).find_all(attrs={'class': 'post-text'})] except: - # can fail when none are found + logging.debug(f'In thread {threading.current_thread().getName()} no post-text found in response.') return [] @staticmethod def get_code(response: requests.Response): try: - return [element.get_text() for element in Site.cook_soup(response).find_all('code')] + return [element.get_text() for element in AbstractSite.cook_soup(response).find_all('code')] except: - # can fail when none are found + logging.debug(f'In thread {threading.current_thread().getName()} no code found in response.') return [] @staticmethod def get_min_pause(): return StackOverflow.min_pause + + @staticmethod + def clear_back_off(): + with StackOverflow.back_off_lock: + prev_back_off = StackOverflow.back_off + + if StackOverflow.back_off: + StackOverflow.back_off = 0 + + return prev_back_off diff --git a/stackoversight/scraping/thread_executioner.py b/stackoversight/scraping/thread_executioner.py index d9a9755..fee345c 100644 --- a/stackoversight/scraping/thread_executioner.py +++ b/stackoversight/scraping/thread_executioner.py @@ -1,9 +1,7 @@ -# For threading +import ctypes +import logging import threading -# For victim queue from queue import Queue -# For raising error -import ctypes class ThreadExecutioner: @@ -13,9 +11,9 @@ def mass_murder(victims: Queue): while True: victim = victims.get(block=True) ThreadExecutioner.murder(victim) + except SystemExit: - print() - # TODO: logging + logging.info(f'System exit exception raised, {threading.current_thread().getName()} successfully killed.') @staticmethod def murder(victim: threading.Thread): @@ -23,23 +21,39 @@ def murder(victim: threading.Thread): if alive: if not ctypes.pythonapi.PyThreadState_SetAsyncExc(victim, ctypes.py_object(SystemExit)): raise ChildProcessError - victim.join() + else: + logging.info(f'{victim.getName()} is dead, no need to kill prematurely.') + victim.join() return alive @staticmethod def execute(target, tasks: Queue, *args): + current_thread_name = threading.current_thread().getName() + hit_queue = Queue() thread_killer = threading.Thread(target=ThreadExecutioner.mass_murder, args=[hit_queue], daemon=True) + thread_killer.setName(f'{current_thread_name}\'s Thread Killer') thread_killer.start() + logging.info(f'New killer, {thread_killer.getName()}, spawned.') + try: + worker_count = 0 + while True: task = tasks.get(block=True) - print(task) - threading.Thread(target=target, args=(task, hit_queue, *args), daemon=True) + worker = threading.Thread(target=target, args=(task, hit_queue, *args), daemon=True) + worker.setName(f'{current_thread_name}\'s Worker #{worker_count}') + worker.start() + + logging.info(f'New worker, {worker.getName()}, spawned for task {task}.') + worker_count += 1 except SystemExit: + logging.info(f'System exit exception raised, {current_thread_name}\'s killer will now be killed.') + ThreadExecutioner.murder(thread_killer) - print('Done scraping parent links') + + logging.info(f'{current_thread_name} successfully killed')