-
Notifications
You must be signed in to change notification settings - Fork 19
support datasource for github archive #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can consider adding an examples
section to show how the entire workflow is used.
uri_info = url; | ||
} else { | ||
if Path::new(uri).is_absolute() { | ||
uri_info = Url::from_file_path(uri).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not recommended to use the unwrap
method. If you use it, it'll be difficult to find the root cause when something goes wrong.
} | ||
|
||
impl GHarchiveSource { | ||
pub fn new(uri: &str) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this part supports both URLs and HTTP requests simultaneously, the functionality should be more streamlined.
For example, setting HTTP as the default option, and then providing a from_file
method to handle local files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanna support URI(Uniform Resource Identifier), http res and local file are both URI .
so I use one new() method .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, this will make the code complexity too high.
Alternatively, a dedicated resource processing module could be provided.
In that case, the new()
method would seem easier to understand.
With the current implementation, it will be very difficult for oneself or others to understand the code after several months.
It is highly recommended to keep the functionality single - purposed. This makes it easier to further expand in the future. For example, is there a possibility that these files will appear in engines like S3 in the future? Or perhaps when retrieving these files from certain data sources, authentication is required.
let scheme = self.uri.scheme(); | ||
if scheme == "http" || scheme == "https" { | ||
let client = reqwest::Client::builder() | ||
.timeout(std::time::Duration::from_secs(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a good idea to hard - code the value as 10 seconds here.
It's recommended to make it a configuration so that users can set an appropriate value externally.
The parameter can default to 10 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no problem
|
||
#[async_trait] | ||
impl Source<Event> for GHarchiveSource { | ||
async fn init(&mut self) -> StreamResult<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The content of this function is too long. It is recommended to split it into two functions for processing. This way, it will be clearer to read.
For example,
if scheme == "http" || scheme == "https" {
init_http();
} else if scheme == "file" {
init_file();
}
/* | ||
archive file downloaded from https://data.gharchive.org/ must be split by CRLF | ||
*/ | ||
async fn next(&mut self) -> StreamResult<Option<Record<Event>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is a bit wordy. It is recommended to change it to a form similar to the following.
self.reader
.as_mut()
.map_or_else(
|| Ok(None),
|reader| async {
let mut line = String::new();
let read_result = reader.read_line(&mut line).await?;
if read_result == 0 {
return Ok(None);
}
let event: Event = serde_json::from_str(&line).unwrap();
Ok(Some(Record::new(event)))
}.await
)
@@ -1,6 +1,11 @@ | |||
pub mod csv; | |||
pub mod generator; | |||
|
|||
#[cfg(feature = "enable-gharchive")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The enable-
prefix can be removed.
The issue of retrying needs to be considered, as HTTP requests may fail. |
Certainly, for the current implementation of gharchive, it actually only supports the HTTP method. You could even consider removing the file - related functionality first. It's advisable not to over - design. Instead, more thought should be given to how to solve the current timeout issue of gharchive, as gharchive has many limitations at the network level. |
It would be better to complete the HTTP part in an independent issue #58 . Let's not deal with the retry problem here for now. After the HTTP issue is implemented later, we can come back to replace the relevant logic here. So, please delete the file functionality first, or just handle it in a Currently, there is a user directly making an HTTP request, and there is no need for the file reading method for now. |
I fixed the problem you required , add a new PR : #70 |
run test:
cargo test -p fluxus-sources --features enable-gharchive test_local_source -- --nocapture
cargo test -p fluxus-sources --features enable-gharchive test_http_source -- --nocapture