Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scraping-dev-002-rxJava - Rewrite service with Rx Java #3

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion dev_projects/ScrapingService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,20 @@
<scope>test</scope>
</dependency>

</dependencies>
<!-- Rx Java -->
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.5</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.yen.scrpe;

import com.yen.scrpe.Task.Factory.ScrapeTaskFactoryRxJavaGpt;
import com.yen.scrpe.Task.Factory.ScrapeTaskFactoryV3Gpt;
import com.yen.scrpe.Task.PokemonCollectTask;
import com.yen.scrpe.Task.Factory.ScrapeTaskFactory;
import com.yen.scrpe.Task.PokemonCollectTaskRxJavaGpt;
import com.yen.scrpe.Task.PokemonCollectTaskV3Gpt;
import com.yen.scrpe.service.ScrapeService;
import com.yen.scrpe.service.ScrapeServiceMultiThreadV3Gpt;
import com.yen.scrpe.service.ScrapeServiceRxJavaGpt;

import java.io.IOException;

Expand All @@ -20,7 +23,7 @@ public static void main(String[] args) throws IOException, InterruptedException
Long start = System.currentTimeMillis();

// to limit the number to scrape to 5
int LIMIT = 50; // 50;
int LIMIT = 5; // 50;

/** V1 : single thread (original code ) */
// ScrapeService scrapeService = new ScrapeService();
Expand All @@ -31,11 +34,11 @@ public static void main(String[] args) throws IOException, InterruptedException
// scrapeTaskFactory.run();

/** V3 : multi thread (gpt) */
ScrapeServiceMultiThreadV3Gpt scrapeService = new ScrapeServiceMultiThreadV3Gpt();
PokemonCollectTaskV3Gpt pokemonCollectTask = new PokemonCollectTaskV3Gpt(scrapeService);

ScrapeTaskFactoryV3Gpt scrapeTaskFactory = new ScrapeTaskFactoryV3Gpt(scrapeService, pokemonCollectTask, LIMIT);
scrapeTaskFactory.run();
// ScrapeServiceMultiThreadV3Gpt scrapeService = new ScrapeServiceMultiThreadV3Gpt();
// PokemonCollectTaskV3Gpt pokemonCollectTask = new PokemonCollectTaskV3Gpt(scrapeService);
//
// ScrapeTaskFactoryV3Gpt scrapeTaskFactory = new ScrapeTaskFactoryV3Gpt(scrapeService, pokemonCollectTask, LIMIT);
// scrapeTaskFactory.run();



Expand All @@ -59,6 +62,16 @@ public static void main(String[] args) throws IOException, InterruptedException
// pokemonCollectTask.getPokemonProducts().size());
// System.out.println("pokemonProducts = " + pokemonCollectTask.getPokemonProducts());



/** V3 : RX JAVA (gpt) */
ScrapeServiceRxJavaGpt scrapeService = new ScrapeServiceRxJavaGpt();
PokemonCollectTaskRxJavaGpt pokemonCollectTask = new PokemonCollectTaskRxJavaGpt(scrapeService);
pokemonCollectTask.run(LIMIT);

ScrapeTaskFactoryRxJavaGpt scrapeTaskFactory = new ScrapeTaskFactoryRxJavaGpt(scrapeService, pokemonCollectTask, LIMIT);
scrapeTaskFactory.run();

Long end = System.currentTimeMillis();
System.out.println("-----> Total duration = " + (end - start));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.yen.scrpe.Task.Factory;

import com.yen.scrpe.Task.BaseScrapeTask;
import com.yen.scrpe.service.BaseScrapeService;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;

public class ScrapeTaskFactoryRxJavaGpt {

// Attributes
private BaseScrapeService scrapeService;
private BaseScrapeTask scrapeTask;
private int limit;

public ScrapeTaskFactoryRxJavaGpt() {}

public ScrapeTaskFactoryRxJavaGpt(
BaseScrapeService scrapeService, BaseScrapeTask scrapeTask, Integer limit) {
this.scrapeService = scrapeService;
this.scrapeTask = scrapeTask;
this.limit = limit;
}

// Methods
public Single<Void> run() {
return Single.create(
emitter -> {
try {
scrapeTask.run(limit);
emitter.onSuccess(null);
} catch (IOException | InterruptedException e) {
emitter.onError(e);
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.yen.scrpe.Task;

import com.yen.scrpe.model.PokemonProduct;
import com.yen.scrpe.service.BaseScrapeService;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class PokemonCollectTaskRxJavaGpt implements BaseScrapeTask {

// Attributes
private BaseScrapeService scrapeService;
private List<PokemonProduct> pokemonProducts;
private Set<String> pagesDiscovered;
private List<String> pagesToScrape;

// Constructors
public PokemonCollectTaskRxJavaGpt() {}

public PokemonCollectTaskRxJavaGpt(BaseScrapeService scrapeService) {
this.scrapeService = scrapeService;
this.pokemonProducts = new ArrayList<>();
this.pagesDiscovered = new HashSet<>();
this.pagesToScrape = new ArrayList<>();

// Initialize the scraping queue
this.pagesToScrape.add("https://scrapeme.live/shop/page/1/");
this.pagesToScrape.add("https://scrapeme.live/shop/page/2/");
this.pagesToScrape.add("https://scrapeme.live/shop/page/50/");
}

// Getter and Setter methods
public BaseScrapeService getScrapeService() {
return scrapeService;
}

public void setScrapeService(BaseScrapeService scrapeService) {
this.scrapeService = scrapeService;
}

public List<PokemonProduct> getPokemonProducts() {
return pokemonProducts;
}

public void setPokemonProducts(List<PokemonProduct> pokemonProducts) {
this.pokemonProducts = pokemonProducts;
}

public Set<String> getPagesDiscovered() {
return pagesDiscovered;
}

public void setPagesDiscovered(Set<String> pagesDiscovered) {
this.pagesDiscovered = pagesDiscovered;
}

public List<String> getPagesToScrape() {
return pagesToScrape;
}

public void setPagesToScrape(List<String> pagesToScrape) {
this.pagesToScrape = pagesToScrape;
}

// Method
public void run(int limit) {
Observable.fromIterable(pagesToScrape)
.take(limit)
.flatMap(pageUrl -> scrapePage(pageUrl)
.subscribeOn(Schedulers.io())
.doOnNext(product -> pagesDiscovered.add(pageUrl)))
.toList();
//return null;
}

private Observable<PokemonProduct> scrapePage(String pageUrl) {
return Observable.create(emitter -> {
try {
scrapeService.scrapeProductPage(pokemonProducts, pagesDiscovered, pagesToScrape, pagesToScrape.indexOf(pageUrl));
pokemonProducts.forEach(emitter::onNext);
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.yen.scrpe.service;

import com.yen.scrpe.model.PokemonProduct;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class ScrapeServiceRxJava implements BaseScrapeService{

// attr
private final String BASE_URL = "https://scrapeme.live/shop";

// constructor
public ScrapeServiceRxJava() {}

@Override
public void scrapeProductPage(List<PokemonProduct> pokemonProducts, Set<String> pagesDiscovered, List<String> pagesToScrape, Integer i) throws IOException, InterruptedException {

System.out.println(
">>> (scrapeProductPage) pagesDiscovered = "
+ pagesDiscovered
+ " pagesToScrape = "
+ pagesToScrape);

// the current web page is about to be scraped and
// should no longer be part of the scraping queue
String url = pagesToScrape.remove(0);
pagesDiscovered.add(url);
Document doc = this.prepareConnect(i);

Elements paginationElements = doc.select("a.page-numbers");
Elements products = doc.select("li.product");

Flux<String> pagesToScrape_ = Flux.fromIterable(pagesToScrape);

// TODO : fix below
// pagesToScrape_ = this.collectToScrape(paginationElements, pagesToScrape_, pagesDiscovered);
// pokemonProducts = this.collectProductData(products, pokemonProducts);

}


private Document prepareConnect(int pageNum) throws IOException {

String URL = this.BASE_URL + "/page/" + pageNum;
System.out.println("URL = " + URL);
return Jsoup.connect(URL)
.userAgent(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36")
.header("Accept-Language", "*")
.get();
}

private Flux<String> collectToScrape(
Elements paginationElements, Flux<String> pagesToScrape, Set<String> pagesDiscovered) {

List<String> toScrapeList = pagesToScrape.toStream().collect(Collectors.toList());

// iterating over the pagination HTML elements
for (Element pageElement : paginationElements) {

// System.out.println(">>> pageElement = " + pageElement.text());
// the new link discovered
String pageUrl = pageElement.attr("href");

// if the web page discovered is new and should be scraped
// if (!pagesDiscovered.contains(pageUrl) && !pagesToScrape.contains(pageUrl)) {
// pagesToScrape.add(pageUrl);
// }

if (!pagesDiscovered.contains(pageUrl) && !toScrapeList.contains(pageUrl)) {
//pagesToScrape.add(pageUrl);
toScrapeList.add(pageUrl);
}


// adding the link just discovered
// to the set of pages discovered so far
pagesDiscovered.add(pageUrl);
}

// Flux.just(toScrapeList.stream().flatMap(x -> {
// return x.;
// }).collect(Collectors.toList());


/** NOTE !!! vai Flux.fromIterable,
*
* we can transform List<String> to Flux<String>
*/
pagesToScrape = Flux.fromIterable(toScrapeList);

return pagesToScrape;
}

private Flux<PokemonProduct> collectProductData(
Elements products, List<PokemonProduct> pokemonProducts) {

for (Element product : products) {
PokemonProduct pokemonProduct = this.enrichProduct(product);
pokemonProducts.add(pokemonProduct);
}

return Flux.fromIterable(pokemonProducts);
}

private PokemonProduct enrichProduct(Element product) {

// collect data
PokemonProduct pokemonProduct = new PokemonProduct();

// extracting the data of interest from the product HTML element
// and storing it in pokemonProduct
pokemonProduct.setUrl(product.selectFirst("a").attr("href"));
pokemonProduct.setImage(product.selectFirst("img").attr("src"));
pokemonProduct.setName(product.selectFirst("h2").text());
pokemonProduct.setPrice(product.selectFirst("span").text());

return pokemonProduct;
}

}
Loading