-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpc_pipeline.Rmd
335 lines (226 loc) · 23.4 KB
/
pc_pipeline.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# (PART) PIP Procedures {.unnumbered}
# Poverty Calculator Pipeline (pre-computed estimations) {#pcpipeline}
The Poverty Calculator Pipeline--hereafter, and only for the rest of this chapter, pipeline--is the technical procedure to calculate the pre-computed (or statitic) estimations of the PIP project. These estimations have two main purposes:
1. To provide the user with instantaneous information about distributive measures of all the household surveys in the PIP repository that do not depend on the value of the poverty line (e.g. mean income, quantiles). Avoiding thus the need for re-computation as it was the case in PovcalNet for some of these measures.
2. To provide the necessary inputs to the PIP API.
This chapter walks you through the folder structure of the folder, the main R script, `_targets.R`, and the complete and partial execution of the script. Also, it provides some tips for debugging.
## Folder structure
The pipeline is hosted in the Github repository [PIP-Technical-Team/pip_ingestion_pipeline](https://github.com/PIP-Technical-Team/pip_ingestion_pipeline). At the root of the repo you will find a series of files and folders.
```{r, eval=FALSE}
#> +-- batch
#> +-- pip_ingestion_pipeline.Rproj
#> +-- R
#> +-- README.md
#> +-- renv
#> +-- renv.lock
#> +-- run.R
#> +-- _packages.R
#> \-- _targets
#> \-- _targets.R
```
### Folders {.unnumbered}
- `R` contains long R functions used during the pipeline
- `batch` is a script for timing the execution of the pipeline. This folder should probably be removed
- `_targets` is a folder for all objects created during the pipeline. You don't need to look inside as its content is managed by the `targets` package.
- `renv` is a folder for reproducible environment.
### Files {.unnumbered}
- `_packages.R` is created by `targets::tar_renv()`. Do not modify manually.
- `_targets.R` contains the pipeline. This is the most important file.
## Prerequisites
Before you start working on the pipeline, you need to make sure to have the following PIP packages.
> **Note 1**: notice that instructions below contain suffixes like `@development`. These specify the branch of the particular package that you need to use. Ideally, all packages should use the master branch; however, that will only be possible until the end of the development process.
> **Note 2**: if you update any of the packages developed by the PIP team, make sure you always increased the version of the package using the function `usethis::use_version()`. Even if the change in the package is small, you need on increased the version of the package. Otherwise, `{targets}` won't execute the sections of the pipeline that run the functions you changed.
```{r, eval=FALSE}
remotes::install_github("PIP-Technical-Team/pipdm@development")
remotes::install_github("PIP-Technical-Team/pipload@development")
remotes::install_github("PIP-Technical-Team/wbpip@halfmedian_spl")
install.packages("joyn")
```
In case `renv` is not working for you, you may need to install all the packages mentioned in the `_packages.R` script at the root of the folder. Also, make sure to install the most recent version of `targets` and `tarchetypes` packages.
## Structure of the `_targets.R` file
Even thought the pipeline script looks like a regular R script, it is structured in a specific way in order to make it work with the [`{targets}`](https://books.ropensci.org/targets/) package. In fact, notice that it must be called `_targets.R` at the root of the project. It is highly recommended that you read the entire [targets manual](https://books.ropensci.org/targets/) to fully understand how it works. We will often be referring to such manual in order to expand on any particular targets' concept.
### Start up {.unnumbered}
The first part of the pipeline sets up the environment. The process involve,
1. loading the `{targets}` and `{tarchetypes}` packages;
<!-- -->
2. creating default values like directories, time stamps, survey and reference years boundaries, compression level of .fst files, etc.;
3. executing `tar_option_set()` to set up an option in `{targets}`. `packages` and `imports` are two particularly important options to track changes in package dependencies. You can read more about it in the sections [Loading and configuring R packages](https://books.ropensci.org/targets/practices.html#loading-and-configuring-r-packages) and [Packages-based invalidation](https://books.ropensci.org/targets/practices.html#packages-based-invalidation) of the targets manual;
4. attaching all the packages and functions of the project by running `source('_packages.R')` and `source('R/_common.R')`.
### Step 1: small functions {.unnumbered}
According to the section [Functions in pipelines](https://books.ropensci.org/targets/functions.html#functions-in-pipelines) of the targets manual, it is recommend to only use functions rather than expressions during the executions. Presumably, the reason for this is that targets track changes in functions but not in expressions. Thus, this scripts section defines small functions that are executed along the pipeline. In the section above, the scripts `source('R/_common.R')` loads longer functions. Yet, keep in mind that the `'R/_common.R'` was used in a previous version of the pipeline before `{targets}` was implemented. Now, most of the function in `'R/_common.R'` are included in the `{pipdm}` package.
### Step 2: preparing the data {.unnumbered}
[This section used to be longer in previous versions of the pipeline because it used to identify the auxiliary data, load the PIP microdata inventory, and create the cache files. It now only identifies the auxiliary data.]{style="color:red"}
### Step 3: The actual pipeline {.unnumbered}
Although better explained in the next section, the overall order of the pipeline is as follows:
1. Load all necessary data (that is, auxiliary data and inventories), and then create any cache fie that has not been created yet.
2. Calculate means in LCU
3. Crate deflated survey mean (DSM) table
4. Calculate reference year table (aka., interpolated means table)
5. Calculate distributional stats
6. Create output tables
1. join survey mean table with dist table
2. join reference year table with dist table
3. coverage table aggregate population at the regional level table
7. Clean and save.
## Understanding the pipeline
We must understand not only how the `{targets}` package works, but also how the targets of the Poverty Calculator Pipeline are created. For the former, you can read the targets manual. For the latter, we should start by making a distinction between the different types of targets.
In `{targets}` terminology, there are two kinds of targets, **stems** and **branches**. **Stems** are unitary targets. That is, for each target there is only one single R object. **Branches**, on the other hand, are targets that contain several objects or *subtargets* inside (you can learn more about them in the chapter [Dynamic branching](https://books.ropensci.org/targets/dynamic.html#dynamic) of the targets manual). We will see the use of this type of targets when we talk about the use of cache files.
### Stem targets {.unnumbered}
There are two ways to create **stem** targets: either using `tar_target()` or using `tar_map()` from the `{tarchetypes}` package. The `tar_map()` function allows to create **stem** targets iteratively. See for instance the creation of targets for each auxiliary data:
```{r, eval=FALSE}
tar_map(
values = aux_tb,
names = "auxname",
# create dynamic name
tar_target(
aux_dir,
auxfiles,
format = "file"
),
tar_target(
aux,
pipload::pip_load_aux(file_to_load = aux_dir,
apply_label = FALSE)
)
)
```
`tar_map()` takes the values in the data frame `aux_tb` created in [Step 2: preparing the data] and creates two type of targets. First, it creates the target `aux_dir` that contains the paths of the auxiliary files, which are available in the column `auxfiles` of `aux_tb`. This is done by creating an internal target within `tar_map()` and using the argument `format = "file"`. This process lets `{targets}` know that we will have objects that are loaded from a file and are not created inside the pc pipeline.
Then, `tar_map()` uses the the column `auxname` of `aux_tb` to name the targets that contain the auxiliary files. Each target will be prefixed by the word "aux". This is why we had to add the argument `file_to_load` to `pipload::pip_load_aux`, so we can let `{targets}` know that the file paths defined in target `aux_dir` are used to create the targets prefixed with "aux", which are the actual targets. For example, if I need to use the population data frame inside the pc pipeline, I'd use the target `aux_pop`, which had a corresponding file path in `aux_dir`. This way, if the original file referenced in `aux_dir` changes, all the targets that depend on `aux_pop` will be run again.
### Branches targets {.unnumbered}
[Let's think of a branch target like...]{style="color:red"}
As explained above, branch targets are targets made of many "subtargets" that follow a particular pattern. Most of the targets created in the pc pipeline are **branch** targets because we need to execute the same procedure in every cache file. This could have been done internally in a single one, but then we would lose the tracking features of `{targets}`. Additionally, we could have created a **stem** target for every cache file, result, and output file, but that would have been both impossible to visualize and more difficult to code. Hence, branch targets is the best option.
The following example illustrates how it works,
```{r, eval=FALSE}
# step A
tar_target(
cache_inventory_dir,
cache_inventory_path(),
format = "file"
),
# step B
tar_target(
cache_inventory,
{
x <- fst::read_fst(cache_inventory_dir,
as.data.table = TRUE)
},
),
# step C
tar_target(cache_files,
get_cache_files(cache_inventory)),
# step D
tar_files(cache_dir, cache_files),
# step E
tar_target(cache,
fst::read_fst(path = cache_dir,
as.data.table = TRUE),
pattern = map(cache_dir),
iteration = "list")
```
The code above illustrates several things. It is divided in steps, with the last step (step E) being the part of the code where the **branch** target is created. Yet, it is important to understand all the previous steps.
In step A we create target `cache_inventory_dir`, which is merely the file path that contains the cache inventory. Notice that it is returned by a function and not entered directly into the target. Since it is a file path, we need to add the argument `format = "file"` to let `{targets}` know that it is input data. In step B we load the cache inventory file into target `cache_inventory` by providing the target "path" that we created in step A. This file has several columns. One of them contains the file path of every single cache file in the PIP network drive. That single column is extracted from the cache inventory in step C. Then, in step D, each file path is declared as input, using the convenient function `tar_files()`, creating thus a new target, `cache_dir`. Finally, we create **branch** target `cache` with all the cache files by loading each file. To do this iteratively, we parse the `cache_dir` target to the `path` argument of the function `fst::read_fst()` and to the `pattern = map()` argument of the `tar_target()` function. At the very end, we need to specify that the output of the iteration is stored as a list, using the argument `iteration = "list"`.
The basic logic of **branch** targets is that the vector or list to iterate through should be parsed to the function's argument and to the `pattern = map()` argument of the `tar_target()` function. It is very similar to `purrr::map()`
> **Note**: if we are iterating through *more than one* vector or list, you need to (1) separate each of them by commas in the `map()` part of the argument (See example code below). (2) make sure all the vectors or lists **have the same length**. This is why we cannot remove NULL or NA values from any target. (3) make sure you do **NOT** sort any of the output targets as it will loose its correspondence with other targets.
```{r, eval=FALSE}
# Example of creating branch target using several lists to iterate through.
tar_target(
name = dl_dist_stats,
command = db_compute_dist_stats(dt = cache,
mean = dl_mean,
pop = aux_pop,
cache_id = cache_ids),
pattern = map(cache, dl_mean, cache_ids),
iteration = "list"
)
```
### Creating the cache files {.unnumbered}
The following code illustrates the creation of cache files:
```{r, eval=FALSE}
tar_target(pipeline_inventory, {
x <- pipdm::db_filter_inventory(
dt = pip_inventory,
pfw_table = aux_pfw)
# Uncomment for specific countries
# x <- x[country_code == 'IDN' & surveyid_year == 2015]
}
),
tar_target(status_cache_files_creation,
pipdm::create_cache_file(
pipeline_inventory = pipeline_inventory,
pip_data_dir = PIP_DATA_DIR,
tool = "PC",
cache_svy_dir = CACHE_SVY_DIR,
compress = FST_COMP_LVL,
force = TRUE,
verbose = FALSE,
cpi_dt = aux_cpi,
ppp_dt = aux_ppp)
)
```
It is important to understand this part of the pc pipeline thoroughly because the cache files used to be created in [Step 2: preparing the data] rather than here. Now, it has not only been integrated in the pc pipeline, but it is also possible to execute the creation of cache files independently from the rest of the pipeline, by following the instructions in [Executing the \_targets.R file].
The first target, `pipeline_inventory` is just the inner join of the pip inventory dataset and the price framework (pfw) file, to make sure we only include what the **pfw** says. This data set also contains a lot of useful information to create the cache files. Note that the commented line in this target would filter the pipeline inventory to have only the information for IDN, 2015. [If you need to update specific cache files, you must add the proper filtering condition there]{style="color:red"}.
In the second target, `status_cache_files_creation`, you will create the cache files but notice that the returning value of the function `pipdm::create_cache_file()` is not the cache file per-se, but a list with the status of the creation process. If the creation of a particular file fails, it does not stop the iteration that creates all the cache files. At the end of the process, it returns a list with the creation status of each cache file. Notice that the function `pipdm::create_cache_file()` requires the CPI and the PPP auxiliary data. That is because the variable `welfare_ppp`, which is the welfare aggregate in 2011 PPP values, is added to the cache files. FInally, and more importantly, the argument `force = TRUE` ensures that even if the cache file already exists, it should be modified. This is important when you require additional features in the cache file from the then ones it currently has. If set to `TRUE`, it will replace any file in the network drive that is listed in `pipeline_inventory`. If set to `FALSE`, only the files that are in `pipeline_inventory` -but not in the cache folder- will be created. Use this option only when you need to add new features to all cache data, or when you are testing and only need a few surveys with the new features.
## Understanding `{pipdm}` functions
The `{pipdm}` package is the backbone of the pc pipeline. It is in charge of executing the functions in `{wbpip}` and consolidate the new DataBases. This is why many of the functions in `{pipdm}` are prefixed with "db\_".
### Internal structure of `{pipdm}` functions
The main objective of `{pipdm}` is to execute the functions in `{wbpip}` to do the calculations and then build the data frames. As of today (`r Sys.Date()`), the process is a little intricate.
Let's take the example of estimating distributive measures in the pipeline. The image [below](#pipdm-structure) shows that there are at least three intermediate function levels between the `db_compute_dist_stats()` function, which is directly executed in the pc pipeline, and the `wbpip::md_compute_dist_stats()`, which makes the calculations. Also, notice that the functions are very general in regards to the output. No higher level function is specific enough to retrieve only one measure, such as the Gini coefficient, or the median, or the quantiles of the distribution. If you need to add or modify one particular distributive measure, you must do it in functions inside `wbpip::md_compute_dist_stats()`, making sure the new output does not mess up the execution of any of the intermediate functions before the results get to `db_compute_dist_stats()`.
![](img/pipdm_structure.png){#pipdm-structure}
This long chain of functions is inflexible and makes debugging very difficult. So, if you need to make any modification, first identify the chain of execution in each `pipdm` function you modify, and then make sure your changes do not affect the output format as it may break the execution chain. This is also a good example of why this structure needs to be improved.
### Updating `{pipdm}` (or any other PIP package)
As explained above, if you need to modify any function in `pipdm` or in `wbpip`, you need to make sure that the output does not conflict with the execution chain. Additionally, If you update any of the packages developed by the PIP team, make sure you always increased the version of the package using the function `usethis::use_version()`. Even if the change in the package is small, you need to increase the version of the package. Otherwise, `{targets}` won't execute the sections of the pipeline that run the functions you changed. Finally as explained in the [Prerequisites], if you are working on a branch different than master, make sure you install that version of the package before running the pipeline.
## Executing the `_targets.R` file
The `.Rprofile` at the root of the directory makes sure that both `{targets}` and `{tarchetypes}` are loaded when the project is started. The whole pipeline execution might be very time consuming because it still needs to load all the data in the network drive. If you use a desktop remote connection the execution might be faster than running it locally, but it is still very time consuming. So, it is advisable to only execute the targets that are directly affected by your changes and manually check that everything looks ok. After that, you can execute the entire code confidently and leave it running overnight.
In order to execute the whole pipeline, you only need to type the directive `tar_make()` in the console. If you want to execute only one target, then type the name of the target in the same directive, e.g., `tar_make(dl_dist_stats)`. Keep in mind that if the inputs of prior targets to the objective target have changed, those targets will be executed first.
## Debugging
Debugging in targets is not easy. Yet, there are two ways to do it. The first way is provided in the chapter [Debugging](https://books.ropensci.org/targets/debugging.html) of the Targets Manual. It provides clear instruction on how to debug *while still being in the pipeline*, but it could be the case that you don't find this method flexible to dig deep enough into the problem. Alternatively, you could debug by stepping out of the pipeline a little bit and gain more flexibility, as described below.
Debugging is needed in one of two cases: one, because you got an error when running the pipeline with `tar_make()` or, two, because your results are odd. In either case, you should probably have an idea--though not always--of where the problem is. If the problem is an error in the execution of the pipeline, `{targets}` printed messages are usually informative.
### Debugging stem targets {.unnumbered}
Let's see a simple example. Assume the problem is in the target `dt_dist_stats`, which is created by executing the function `db_create_dist_table` of the `{pipdm}` package. Since the problem is in there, all the targets and inputs necessary to create `dt_dist_stats` should be available in the `_targets/` data store. So, you can load them using `tar_load()` and execute the function in debugging mode. Like this,
```{r, eval=FALSE}
tar_load(dl_dist_stats)
tar_load(svy_mean_ppp_table)
tar_load(cache_inventory)
debugonce(pipdm::db_create_dist_table)
pipdm::db_create_dist_table(
dl = dl_dist_stats,
dsm_table = svy_mean_ppp_table,
crr_inv = cache_inventory
)
```
Note that you must use the `::` because the environment in which `{targets}` runs is different from your global environment, in which you might not have attached all the libraries.
### Debugging branch targets {.unnumbered}
The challenge debugging branch targets is that if the problem is in a specific survey, you can't access the "subtarget" using the survey ID, or something of that nature, because the name of the subtarget is created by `{targets}` using a random number. This requires a little more of work.
Imagine now that the distributive measures of IDN 2015 are wrong. You see the pipeline and notice that these calculations are executed in target `dl_dist_stats`, which is the branch target created over **all the cache files!** It would look like something like this:
```{r, eval=FALSE}
tar_target(
name = dl_dist_stats,
command = db_compute_dist_stats(dt = cache,
mean = dl_mean,
pop = aux_pop,
cache_id = cache_ids),
pattern = map(cache, dl_mean, cache_ids),
iteration = "list"
)
```
In order to find the problem in IDN 2015, this what you could do:
```{r, eval=FALSE}
# Load data
dt <- pipload::pip_load_cache("IDN", 2015, "PC")
tar_load(dl_mean)
tar_load(cache_ids)
tar_load(aux_pop)
# Extract corresponding mean and cache ID
idt <- which(cache_ids == unique(dt$cache_id))
cache_id <- cache_ids[idt]
mean_i <- dl_mean[[idt]]
# Esecute the function of interest
debugonce(pipdm:::compute_dist_stats)
ds <- pipdm::db_compute_dist_stats(dt = dt,
mean = mean_i,
pop = aux_pop,
cache_id = cache_id)
```
First, you load all the inputs. Since target `dl_mean` is a relatively light object, we load it directly from the `_targets/` data store. Targets `cache_ids` and `aux_pop` are data frames, not lists, so we also load them from memory. The microdata, however, is problematic because target `cache`, which is the one that is parsed to create the **actual** `dl_dist_stata` target, is a huge list with all the micro, grouped, and imputed data. The solution is then to load the data frame of interest, using either `pipload` or `fst`.
Secondly, we need to filter the list `dl_mean` and the data frame `cache_ids` to parse only the information accepted by the `pipdm::db_compute_dist_stats()` function. This has to be done when debugging because in the actual target this is done iteratively in `pattern = map(cache, dl_mean, cache_ids)`.
Finally, you execute the function of interest. Notice something else. The target `aux_pop` is parsed as a single data frame because `pipdm::db_compute_dist_stats()` requires it that way. This is also one of the reasons why these functions in `{pipdm}` need some fixing and consistency in the format of the their inputs.