diff --git a/dev_projects/ScrapingService/pom.xml b/dev_projects/ScrapingService/pom.xml index 2f576220..5c330baa 100644 --- a/dev_projects/ScrapingService/pom.xml +++ b/dev_projects/ScrapingService/pom.xml @@ -43,7 +43,20 @@ test - + + + io.reactivex.rxjava3 + rxjava + 3.0.0 + + + + io.projectreactor + reactor-core + 3.6.5 + + + \ No newline at end of file diff --git a/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/ScrappingApplication.java b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/ScrappingApplication.java index 4a339fad..65b27384 100644 --- a/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/ScrappingApplication.java +++ b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/ScrappingApplication.java @@ -1,11 +1,14 @@ package com.yen.scrpe; +import com.yen.scrpe.Task.Factory.ScrapeTaskFactoryRxJavaGpt; import com.yen.scrpe.Task.Factory.ScrapeTaskFactoryV3Gpt; import com.yen.scrpe.Task.PokemonCollectTask; import com.yen.scrpe.Task.Factory.ScrapeTaskFactory; +import com.yen.scrpe.Task.PokemonCollectTaskRxJavaGpt; import com.yen.scrpe.Task.PokemonCollectTaskV3Gpt; import com.yen.scrpe.service.ScrapeService; import com.yen.scrpe.service.ScrapeServiceMultiThreadV3Gpt; +import com.yen.scrpe.service.ScrapeServiceRxJavaGpt; import java.io.IOException; @@ -20,7 +23,7 @@ public static void main(String[] args) throws IOException, InterruptedException Long start = System.currentTimeMillis(); // to limit the number to scrape to 5 - int LIMIT = 50; // 50; + int LIMIT = 5; // 50; /** V1 : single thread (original code ) */ // ScrapeService scrapeService = new ScrapeService(); @@ -31,11 +34,11 @@ public static void main(String[] args) throws IOException, InterruptedException // scrapeTaskFactory.run(); /** V3 : multi thread (gpt) */ - ScrapeServiceMultiThreadV3Gpt scrapeService = new ScrapeServiceMultiThreadV3Gpt(); - PokemonCollectTaskV3Gpt pokemonCollectTask = new PokemonCollectTaskV3Gpt(scrapeService); - - ScrapeTaskFactoryV3Gpt scrapeTaskFactory = new ScrapeTaskFactoryV3Gpt(scrapeService, pokemonCollectTask, LIMIT); - scrapeTaskFactory.run(); +// ScrapeServiceMultiThreadV3Gpt scrapeService = new ScrapeServiceMultiThreadV3Gpt(); +// PokemonCollectTaskV3Gpt pokemonCollectTask = new PokemonCollectTaskV3Gpt(scrapeService); +// +// ScrapeTaskFactoryV3Gpt scrapeTaskFactory = new ScrapeTaskFactoryV3Gpt(scrapeService, pokemonCollectTask, LIMIT); +// scrapeTaskFactory.run(); @@ -59,6 +62,16 @@ public static void main(String[] args) throws IOException, InterruptedException // pokemonCollectTask.getPokemonProducts().size()); // System.out.println("pokemonProducts = " + pokemonCollectTask.getPokemonProducts()); + + + /** V3 : RX JAVA (gpt) */ + ScrapeServiceRxJavaGpt scrapeService = new ScrapeServiceRxJavaGpt(); + PokemonCollectTaskRxJavaGpt pokemonCollectTask = new PokemonCollectTaskRxJavaGpt(scrapeService); + pokemonCollectTask.run(LIMIT); + + ScrapeTaskFactoryRxJavaGpt scrapeTaskFactory = new ScrapeTaskFactoryRxJavaGpt(scrapeService, pokemonCollectTask, LIMIT); + scrapeTaskFactory.run(); + Long end = System.currentTimeMillis(); System.out.println("-----> Total duration = " + (end - start)); } diff --git a/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/Task/Factory/ScrapeTaskFactoryRxJavaGpt.java b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/Task/Factory/ScrapeTaskFactoryRxJavaGpt.java new file mode 100644 index 00000000..ec418944 --- /dev/null +++ b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/Task/Factory/ScrapeTaskFactoryRxJavaGpt.java @@ -0,0 +1,37 @@ +package com.yen.scrpe.Task.Factory; + +import com.yen.scrpe.Task.BaseScrapeTask; +import com.yen.scrpe.service.BaseScrapeService; +import io.reactivex.rxjava3.core.Single; +import java.io.IOException; + +public class ScrapeTaskFactoryRxJavaGpt { + + // Attributes + private BaseScrapeService scrapeService; + private BaseScrapeTask scrapeTask; + private int limit; + + public ScrapeTaskFactoryRxJavaGpt() {} + + public ScrapeTaskFactoryRxJavaGpt( + BaseScrapeService scrapeService, BaseScrapeTask scrapeTask, Integer limit) { + this.scrapeService = scrapeService; + this.scrapeTask = scrapeTask; + this.limit = limit; + } + + // Methods + public Single run() { + return Single.create( + emitter -> { + try { + scrapeTask.run(limit); + emitter.onSuccess(null); + } catch (IOException | InterruptedException e) { + emitter.onError(e); + } + }); + } + +} diff --git a/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/Task/PokemonCollectTaskRxJavaGpt.java b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/Task/PokemonCollectTaskRxJavaGpt.java new file mode 100644 index 00000000..1d1016d6 --- /dev/null +++ b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/Task/PokemonCollectTaskRxJavaGpt.java @@ -0,0 +1,92 @@ +package com.yen.scrpe.Task; + +import com.yen.scrpe.model.PokemonProduct; +import com.yen.scrpe.service.BaseScrapeService; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.schedulers.Schedulers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class PokemonCollectTaskRxJavaGpt implements BaseScrapeTask { + + // Attributes + private BaseScrapeService scrapeService; + private List pokemonProducts; + private Set pagesDiscovered; + private List pagesToScrape; + + // Constructors + public PokemonCollectTaskRxJavaGpt() {} + + public PokemonCollectTaskRxJavaGpt(BaseScrapeService scrapeService) { + this.scrapeService = scrapeService; + this.pokemonProducts = new ArrayList<>(); + this.pagesDiscovered = new HashSet<>(); + this.pagesToScrape = new ArrayList<>(); + + // Initialize the scraping queue + this.pagesToScrape.add("https://scrapeme.live/shop/page/1/"); + this.pagesToScrape.add("https://scrapeme.live/shop/page/2/"); + this.pagesToScrape.add("https://scrapeme.live/shop/page/50/"); + } + + // Getter and Setter methods + public BaseScrapeService getScrapeService() { + return scrapeService; + } + + public void setScrapeService(BaseScrapeService scrapeService) { + this.scrapeService = scrapeService; + } + + public List getPokemonProducts() { + return pokemonProducts; + } + + public void setPokemonProducts(List pokemonProducts) { + this.pokemonProducts = pokemonProducts; + } + + public Set getPagesDiscovered() { + return pagesDiscovered; + } + + public void setPagesDiscovered(Set pagesDiscovered) { + this.pagesDiscovered = pagesDiscovered; + } + + public List getPagesToScrape() { + return pagesToScrape; + } + + public void setPagesToScrape(List pagesToScrape) { + this.pagesToScrape = pagesToScrape; + } + + // Method + public void run(int limit) { + Observable.fromIterable(pagesToScrape) + .take(limit) + .flatMap(pageUrl -> scrapePage(pageUrl) + .subscribeOn(Schedulers.io()) + .doOnNext(product -> pagesDiscovered.add(pageUrl))) + .toList(); + //return null; + } + + private Observable scrapePage(String pageUrl) { + return Observable.create(emitter -> { + try { + scrapeService.scrapeProductPage(pokemonProducts, pagesDiscovered, pagesToScrape, pagesToScrape.indexOf(pageUrl)); + pokemonProducts.forEach(emitter::onNext); + emitter.onComplete(); + } catch (IOException e) { + emitter.onError(e); + } + }); + } +} \ No newline at end of file diff --git a/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/service/ScrapeServiceRxJava.java b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/service/ScrapeServiceRxJava.java new file mode 100644 index 00000000..f8607db7 --- /dev/null +++ b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/service/ScrapeServiceRxJava.java @@ -0,0 +1,133 @@ +package com.yen.scrpe.service; + +import com.yen.scrpe.model.PokemonProduct; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import org.jsoup.Jsoup; +import org.jsoup.nodes.Document; +import org.jsoup.nodes.Element; +import org.jsoup.select.Elements; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class ScrapeServiceRxJava implements BaseScrapeService{ + + // attr + private final String BASE_URL = "https://scrapeme.live/shop"; + + // constructor + public ScrapeServiceRxJava() {} + + @Override + public void scrapeProductPage(List pokemonProducts, Set pagesDiscovered, List pagesToScrape, Integer i) throws IOException, InterruptedException { + + System.out.println( + ">>> (scrapeProductPage) pagesDiscovered = " + + pagesDiscovered + + " pagesToScrape = " + + pagesToScrape); + + // the current web page is about to be scraped and + // should no longer be part of the scraping queue + String url = pagesToScrape.remove(0); + pagesDiscovered.add(url); + Document doc = this.prepareConnect(i); + + Elements paginationElements = doc.select("a.page-numbers"); + Elements products = doc.select("li.product"); + + Flux pagesToScrape_ = Flux.fromIterable(pagesToScrape); + + // TODO : fix below +// pagesToScrape_ = this.collectToScrape(paginationElements, pagesToScrape_, pagesDiscovered); +// pokemonProducts = this.collectProductData(products, pokemonProducts); + + } + + + private Document prepareConnect(int pageNum) throws IOException { + + String URL = this.BASE_URL + "/page/" + pageNum; + System.out.println("URL = " + URL); + return Jsoup.connect(URL) + .userAgent( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36") + .header("Accept-Language", "*") + .get(); + } + + private Flux collectToScrape( + Elements paginationElements, Flux pagesToScrape, Set pagesDiscovered) { + + List toScrapeList = pagesToScrape.toStream().collect(Collectors.toList()); + + // iterating over the pagination HTML elements + for (Element pageElement : paginationElements) { + + // System.out.println(">>> pageElement = " + pageElement.text()); + // the new link discovered + String pageUrl = pageElement.attr("href"); + + // if the web page discovered is new and should be scraped +// if (!pagesDiscovered.contains(pageUrl) && !pagesToScrape.contains(pageUrl)) { +// pagesToScrape.add(pageUrl); +// } + + if (!pagesDiscovered.contains(pageUrl) && !toScrapeList.contains(pageUrl)) { + //pagesToScrape.add(pageUrl); + toScrapeList.add(pageUrl); + } + + + // adding the link just discovered + // to the set of pages discovered so far + pagesDiscovered.add(pageUrl); + } + +// Flux.just(toScrapeList.stream().flatMap(x -> { +// return x.; +// }).collect(Collectors.toList()); + + + /** NOTE !!! vai Flux.fromIterable, + * + * we can transform List to Flux + */ + pagesToScrape = Flux.fromIterable(toScrapeList); + + return pagesToScrape; + } + + private Flux collectProductData( + Elements products, List pokemonProducts) { + + for (Element product : products) { + PokemonProduct pokemonProduct = this.enrichProduct(product); + pokemonProducts.add(pokemonProduct); + } + + return Flux.fromIterable(pokemonProducts); + } + + private PokemonProduct enrichProduct(Element product) { + + // collect data + PokemonProduct pokemonProduct = new PokemonProduct(); + + // extracting the data of interest from the product HTML element + // and storing it in pokemonProduct + pokemonProduct.setUrl(product.selectFirst("a").attr("href")); + pokemonProduct.setImage(product.selectFirst("img").attr("src")); + pokemonProduct.setName(product.selectFirst("h2").text()); + pokemonProduct.setPrice(product.selectFirst("span").text()); + + return pokemonProduct; + } + +} diff --git a/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/service/ScrapeServiceRxJavaGpt.java b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/service/ScrapeServiceRxJavaGpt.java new file mode 100644 index 00000000..e1f523bc --- /dev/null +++ b/dev_projects/ScrapingService/src/main/java/com/yen/scrpe/service/ScrapeServiceRxJavaGpt.java @@ -0,0 +1,135 @@ +package com.yen.scrpe.service; + +import com.yen.scrpe.model.PokemonProduct; +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import org.jsoup.Jsoup; +import org.jsoup.nodes.Document; +import org.jsoup.nodes.Element; +import org.jsoup.select.Elements; +import reactor.core.publisher.Flux; + +//import javax.lang.model.util.Elements; + +public class ScrapeServiceRxJavaGpt implements BaseScrapeService { + + private final String BASE_URL = "https://scrapeme.live/shop"; + + public ScrapeServiceRxJavaGpt() {} + + @Override + public void scrapeProductPage( + List pokemonProducts, + Set pagesDiscovered, + List pagesToScrape, + Integer i) + throws IOException, InterruptedException { + + System.out.println( + ">>> (scrapeProductPage) pagesDiscovered = " + + pagesDiscovered + + " pagesToScrape = " + + pagesToScrape); + + // the current web page is about to be scraped and + // should no longer be part of the scraping queue + String url = pagesToScrape.remove(0); + pagesDiscovered.add(url); + Single doc = this.prepareConnect(i); + + // TODO : optimize below + Elements paginationElements = doc.blockingGet().select("a.page-numbers"); + Elements products = doc.blockingGet().select("li.product"); + + pagesToScrape = this.collectToScrape(paginationElements, pagesToScrape, pagesDiscovered).blockingGet(); + pokemonProducts = this.collectProductData(products, pokemonProducts); + + } + + private Single prepareConnect(int pageNum) { + + // V1 + // return Single.fromCallable(() -> { + // String URL = this.BASE_URL + "/page/" + pageNum; + // System.out.println("URL = " + URL); + // return Jsoup.connect(URL) + // .userAgent( + // "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 + // (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36") + // .header("Accept-Language", "*") + // .get(); + // }); + + // V2 + return Single.fromCallable( + // Callable: https://blog.csdn.net/u010784887/article/details/79320856 + new Callable() { + @Override + public Document call() throws Exception { + String BASE_URL = "https://scrapeme.live/shop"; + String URL = BASE_URL + "/page/" + pageNum; + System.out.println("URL = " + URL); + return Jsoup.connect(URL) + .userAgent( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36") + .header("Accept-Language", "*") + .get(); + } + }); + } + + private Single> collectToScrape( + Elements paginationElements, List pagesToScrape, Set pagesDiscovered) { + + @NonNull Observable> x = Observable.fromIterable(paginationElements) + .map(pageElement -> { + String pageUrl = pageElement.attr("href"); + + // Add new page URLs to the pagesToScrape list if not already discovered + if (!pagesDiscovered.contains(pageUrl) && !pagesToScrape.contains(pageUrl)) { + pagesToScrape.add(pageUrl); + } + + // Add the page to the discovered pages + pagesDiscovered.add(pageUrl); + return pagesToScrape; + }); + + /** via singleOrError, can transform + * Observable> to Single> + */ + return x.singleOrError(); + } + + private List collectProductData( + Elements products, List pokemonProducts) { + + for (Element product : products) { + PokemonProduct pokemonProduct = this.enrichProduct(product); + pokemonProducts.add(pokemonProduct); + } + + return pokemonProducts; + } + + private PokemonProduct enrichProduct(Element product) { + + // collect data + PokemonProduct pokemonProduct = new PokemonProduct(); + + // extracting the data of interest from the product HTML element + // and storing it in pokemonProduct + pokemonProduct.setUrl(product.selectFirst("a").attr("href")); + pokemonProduct.setImage(product.selectFirst("img").attr("src")); + pokemonProduct.setName(product.selectFirst("h2").text()); + pokemonProduct.setPrice(product.selectFirst("span").text()); + + return pokemonProduct; + } + +}