Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle multi-Tasks in input plugins by |ServiceDataSplitter| and related classes. #76

Merged
merged 3 commits into from
Apr 19, 2017

Conversation

dmikurube
Copy link
Member

@muga Kinds of WIP, but some compilable.

@muga
Copy link
Contributor

muga commented Apr 14, 2017

@dmikurube The framework looks good to me 👍 Two comments for now.

  • The role of ServiceDataDispatcher is to split the input task. So I think that InputTaskSplitter or something is better name. How about this?
  • Can you add examples for how to use this? DefaultServiceDataDispatcherBuilder looks good to me but, it's too simple. The users still might not know how to use it.

@dmikurube
Copy link
Member Author

dmikurube commented Apr 17, 2017

@muga Thanks.

The role of ServiceDataDispatcher is to split the input task. So I think that InputTaskSplitter or something is better name. How about this?

Good catch. It is not actually a dispatcher. :) Will rename.

Can you add examples for how to use this? DefaultServiceDataDispatcherBuilder looks good to me but, it's too simple. The users still might not know how to use it.

Of course! Will do.

…ters in |hintPerTask|.

It refactors the inner class of |DefaultServiceDataSplitterBuilder| out as |DefaultServiceDataSplitter|.
@dmikurube dmikurube changed the title Handle multi-Tasks in input plugins by |ServiceDataDispatchable| and related classes. Handle multi-Tasks in input plugins by |ServiceDataSplitter| and related classes. Apr 17, 2017
@dmikurube
Copy link
Member Author

@muga Addressed the former of your comments in 19d2fbd.

I then thought adding examples, but I found that ServiceDataSplitter will be really target-specific. It is a bit hard to provide a good general example such as JacksonTaskReportRecordBuffer for output...

To provide an example, we may need to assume a certain specific web service, and make a plugin for that. Or, do you have good ideas of examples?

@muga
Copy link
Contributor

muga commented Apr 17, 2017

I have one example for REST API based input plugins. Several external REST API provides exporting accounts' data by filtering time range like ['2000-01-01' <= created_at .. < '2017-01-01). If the range is too large, it might take a long time to ingest data. Because data ingest is executed on one single input task. The one task is executed by one thread. If we can divide the large time range on the input task by monthly basis, those multiple smaller input tasks are executed in parallel by multiple threads. How about this?

About the name of ServiceDataSplitter class. It's good to me if the current ServiceDataSplitter will have new methods for helping splitting data (splitToTasks and hintPerTask sound like task splitting for me). But I'm still confusing why we need to split data? This feature enables splitting an input task and then Embulk executes data ingestion according to split input tasks, doesn't it?

For output plugins, I thought that this feature doesn't need to care of output plugins. Because output plugin's PageOutput objects are exected in parallel if multiple input tasks are executed in parallel. But, it's still designing, please share if you have anything good for output plugins.

@dmikurube
Copy link
Member Author

dmikurube commented Apr 18, 2017

I understand the datetime example conceptually, but how do we implement that as a concrete practical example? There are many kinds of datetime-based services in the world.

Plugins will work for actual data in the target service. In many cases, the plugin needs to get the actual range of datetime at first, and then split. Even if the configuration is for 2014-01-01 till 2016-12-31, should we still split the input per month if the actual data is only in 2015-04?

Also, service A may explicitly take the beginning and the ending as dates in UTC formatted in "20170418". Service B may take dates/times with timezones formatted in "Apr 18, 2017 4:12pm". There are many variations.

Finally, even if we assume the entire range is given from the configuration, do we know which parameter contains the range? We can pass just Task or TaskSource to the splitter. The splitter must be aware which parameters to read.

What is good for examples: that's my question. It will depend on the very specific service. It cannot be general. Examples need assumption how the service deal with datetime. (I mentioned about "output" just to compare. JacksonTaskReportRecordBuffer didn't need assumption about any specific service.)


For the name ServiceDataSplitter, we are definitely "splitting the entire data to be input into a few portions for tasks so that multiple tasks can ingest independently in parallel", aren't we?

"What to split" is the entire data. We're splitting the entire data to (for) tasks. We're not a big Embulk Task into smaller Embulk Tasks.

@muga
Copy link
Contributor

muga commented Apr 18, 2017

I agree with you and I also don't think that we can provide a general example. But, it's OK for me to provide an example for a specific service. By seeing it, plugin developers could understand how to split datetime range at least. It's also OK for me not to prepare an example for now. If we could find a good example, we can write that at that time.

Plugins will work for actual data in the target service. In many cases, the plugin needs to get the actual range of datetime at first, and then split. Even if the configuration is for 2014-01-01 till 2016-12-31, should we still split the input per month if the actual data is only in 2015-04?

In my above example, yes an user needs to split datetime range per month. As another idea for for this case, we can take split datetime range by daily basis and share 2015-04 to all tasks to avoid a kind of skew issue.

input_task_0: [2015-04-01, 2015-04-11, 2015-04-21, ...]
input_task_1: [2015-04-02, 2015-04-12, 2015-04-22, ...]
... ...
input_task_9: [2015-04-10, 2015-04-20, 2015-04-30, ...]
# 0, 1, .. 9 are task indices.

Also, service A may explicitly take the beginning and the ending as dates in UTC formatted in "20170418". Service B may take dates/times with timezones formatted in "Apr 18, 2017 4:12pm". There are many variations.

Yes, I think that plugins need to have that calculation. The base library should not support it.

Finally, even if we assume the entire range is given from the configuration, do we know which parameter contains the range? We can pass just Task or TaskSource to the splitter. The splitter must be aware which parameters to read.

In my understanding, splitToTasks splits the entire range to sub ranges for input tasks and stores the sub ranges to task source. hintPerTask overwrites the range instead of the entire range (We don't need to overwriting the entire range but).

For example, if we have the following Embulk config:

in:
  time_range: [2014-01-01, 2017-01-01)

, transaction method calls splitToTasks. Then, it splits sub time ranges and assign them to input tasks. task source will be the following:

in:
  time_range: [2014-01-01, 2017-01-01)
  sub_time_range: [[2014-01, 2014-12], [2015-01, 2015-12], ...]

Here [2014-01, 2014-12] sub time ranges are assigned to an input task associated to task index 0. [2015-01, 2015-12] is for task index 1.

So, hintPerTask overwrites original time range for the input task by the task index like:

in:
  time_range: [2014-01, 2014-12] # for input task associated to task index 1
  sub_time_range: [[2014-01, 2014-12], [2015-01, 2015-12], ...]

How about this? It seems that the role of hintPerTask is not so important. if you think that it's not necessary, please remove it from the ServiceDataSplitter interface.

@dmikurube
Copy link
Member Author

@muga Okay, we don't have such an example very soon. Examples are to be considered later. #78

For the usage:

In my understanding, splitToTasks splits the entire range to sub ranges for input tasks and stores the sub ranges to task source. hintPerTask overwrites the range instead of the entire range (We don't need to overwriting the entire range but).

It can be implemented like that, but not of my intention. We've seen memory consumption issues in listing all indexes in input-s3. We don't want to embed all sub-indexes in global TaskSource, right? splitToTasks and hintPerTask intend 1) not to embed sub-indexes in Task and 2) to share sub-indexes calculation.

For your case :

in:
  time_range: [2014-01-01, 2017-01-01)

ServiceDataSplitter can be implemented without any new "global" value embedded as follows:

public int splitToTasks(TaskSource taskSourceToHint) {
    final Timestamp begin = taskSourceToHint....;
    final Timestamp end = taskSourceToHint....;
    return end.getYearMonth() - begin.getYearMonth() + 1;  // Assume getYearMonth() works as imagined.
}
public void hintPerTask(TaskSource taskSourceToHint, Schema schema, int taskIndex) {
    final Timestamp begin = taskSourceToHint....;
    taskSourceToHint.setSubRange(begin.getYearMonth() + taskIndex,
                                 begin.getYearMonth() + taskIndex + 1);
}

Or, just to share the range size :

public int splitToTasks(TaskSource taskSourceToHint) {
    final Timestamp begin = taskSourceToHint....;
    final Timestamp end = taskSourceToHint....;
    taskSourceToHint.setRangeSize(MONTH);
    return (end - begin) / MONTH + 1;  // NOTE: it's not perfectly correct in case % MONTH == 0.
}
public void hintPerTask(TaskSource taskSourceToHint, Schema schema, int taskIndex) {
    final Timestamp begin = taskSourceToHint....;
    final TimeDiff rangeSize = taskSourceToHint.getRangeSize();
    taskSourceToHint.setSubRange(begin + (taskIndex * rangeSize),
                                 begin + (taskIndex + 1) * rangeSize);
}

hintPerTask is important to allow developers to optimize.

@muga
Copy link
Contributor

muga commented Apr 19, 2017

It can be implemented like that, but not of my intention. We've seen memory consumption issues in listing all indexes in input-s3. We don't want to embed all sub-indexes in global TaskSource, right?

Yes, your idea works if same input task always can take same datetime sub range at least. Because input tasks are sometimes retried on hadoop by using mapreduce executor. In your case, hintPerTask is important.

I think that you can merge this PR 👍

@dmikurube
Copy link
Member Author

Thanks! Merging this PR, and releasing v0.5.0 soon. Let's start (experimental) implementation with ServiceDataSplitter. :)

@dmikurube dmikurube merged commit dfab169 into master Apr 19, 2017
@muga
Copy link
Contributor

muga commented Apr 19, 2017

Yes It's good to me 👍

@dmikurube dmikurube deleted the input-in-parallel branch May 10, 2017 07:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants