Skip to content

Commit

Permalink
[backport 2.0] Fix spark standalone and dien preprocessing (#4094) (#…
Browse files Browse the repository at this point in the history
…4103)

* add scripts

* fix dien

* update dien

* bug fix for classpath

* remove

* update readme

* add script

* minor

* add license

* minor
  • Loading branch information
hkvision authored Feb 24, 2022
1 parent fc4b6bd commit eaf5b0c
Show file tree
Hide file tree
Showing 13 changed files with 733 additions and 52 deletions.
5 changes: 3 additions & 2 deletions python/dllib/src/bigdl/dllib/utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def init_spark_standalone(self,
python_location if python_location else detect_python_location()
if not master:
pyspark_home = os.path.abspath(pyspark.__file__ + "/../")
zoo_standalone_home = os.path.abspath(__file__ + "/../../share/bin/standalone")
zoo_standalone_home = os.path.abspath(__file__ + "/../../../share/dllib/bin/standalone")
node_ip = get_node_ip()
SparkRunner.standalone_env = {
"SPARK_HOME": pyspark_home,
Expand Down Expand Up @@ -293,7 +293,8 @@ def init_spark_standalone(self,
"spark.cores.max": num_executors * executor_cores,
"spark.executorEnv.PYTHONHOME": "/".join(detect_python_location().split("/")[:-2])
})
zoo_bigdl_jar_path = ":".join(list(get_zoo_bigdl_classpath_on_driver()))
# Driver and executor are assumed to have the same Python environment
zoo_bigdl_jar_path = get_zoo_bigdl_classpath_on_driver()
if "spark.executor.extraClassPath" in conf:
conf["spark.executor.extraClassPath"] = "{}:{}".format(
zoo_bigdl_jar_path, conf["spark.executor.extraClassPath"])
Expand Down
3 changes: 2 additions & 1 deletion python/dllib/src/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def setup_package():
install_requires=['numpy>=1.19.5', 'pyspark==2.4.6', 'six>=1.10.0'],
dependency_links=['https://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz'],
include_package_data=True,
package_data={"bigdl.share.dllib": ['lib/bigdl-dllib*.jar', 'conf/*']},
package_data={"bigdl.share.dllib": ['lib/bigdl-dllib*.jar', 'conf/*',
'bin/standalone/*', 'bin/standalone/sbin/*']},
classifiers=[
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3',
Expand Down
58 changes: 32 additions & 26 deletions python/friesian/example/dien/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Train DIEN using the Amazon book review dataset
This folder showcases how to use BigDL Friesian to preprocess and train a [DIEN](https://arxiv.org/pdf/1809.03672.pdf) model.
Model definition is based on [ai-matrix](https://github.com/alibaba/ai-matrix/tree/master/macro_benchmark/DIEN)
[Amazon book review](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books.json.gz) and [meta_books](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Books.json.gz) dataset to be used in this example.
Model definition is based on [ai-matrix](https://github.com/alibaba/ai-matrix/tree/master/macro_benchmark/DIEN) and
[Amazon Book Reviews](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books.json.gz) dataset is used in this example.

## Prepare the environment
We recommend you to use [Anaconda](https://www.anaconda.com/distribution/#linux) to prepare the environments, especially if you want to run on a yarn cluster (yarn-client mode only).
Expand All @@ -13,18 +13,22 @@ pip install --pre --upgrade bigdl-friesian
```

## Prepare the data
1. Download meta_books data from [here](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Books.json.gz).
2. Download full book_review data from [here](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books.json.gz) which contains 22,507,155 records, or you can start from the [small dataset](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books_5.json.gz) which contains 8,898,041 records.
1. Download meta_Books data from [here](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Books.json.gz).
2. Download full reviews_Books data from [here](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books.json.gz) which contains 22,507,155 records, or you can start from the [small dataset](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books_5.json.gz) which contains 8,898,041 records.
3. Use the following script to convert `meta_Books.json` to `meta_Books.csv`:
```bash
python meta_to_csv.py --input_meta /path/to/meta_Books.json
```

## Preprocess the data
* Spark local, example command:
```bash
python dien_preprocessing.py \
--executor_cores 8 \
--executor_memory 50g \
--input_meta /path/to/the/folder/of/meta_books \
--input_transaction /path/to/the/folder/of/review_data\
--output /path/to/the/folder/to/save/preprocessed/parquet_files
--input_meta /path/to/meta_Books.csv \
--input_transaction /path/to/reviews_Books.json \
--output /path/to/the/folder/to/save/preprocessed/parquet/files
```

* Spark standalone, example command:
Expand All @@ -33,34 +37,35 @@ python dien_preprocessing.py \
--cluster_mode standalone \
--master spark://master-url:port \
--executor_cores 40 \
--executor_memory 240g \
--num_executor 8 \
--input_meta /path/to/the/folder/of/meta_books \
--input_transaction /path/to/the/folder/of/review_data\
--output /path/to/the/folder/to/save/preprocessed/parquet_files
--executor_memory 50g \
--num_executors 8 \
--input_meta /path/to/meta_Books.csv \
--input_transaction /path/to/reviews_Books.json \
--output /path/to/the/folder/to/save/preprocessed/parquet/files
```

* Spark yarn client mode, example command:
```bash
python dien_preprocessing.py \
--cluster_mode yarn \
--executor_cores 40 \
--executor_memory 240g \
--input_meta /path/to/the/folder/of/meta_books \
--input_transaction /path/to/the/folder/of/review_data\
--output /path/to/the/folder/to/save/preprocessed/parquet_files
--executor_memory 50g \
--num_executors 8 \
--input_meta /path/to/meta_Books.csv \
--input_transaction /path/to/reviews_Books.json \
--output /path/to/the/folder/to/save/preprocessed/parquet/files
```

__Options:__
* `cluster_mode`: The cluster mode to run the data preprocessing, one of local, yarn, standalone or spark-submit. Default to be local.
* `master`: The master URL, only used when cluster_mode is standalone.
* `executor_cores`: The number of cores to use on each node.
* `executor_memory`: The amount of memory to allocate on each node.
* `num_nodes`: The number of nodes to use in the cluster.
* `num_executors`: The number of nodes to use in the cluster.
* `driver_cores`: The number of cores to use for the driver.
* `driver_memory`: The amount of memory to allocate for the driver.
* `input_meta`: The path to the folder of meta_books jason files, either a local path or an HDFS path.
* `input_transaction`: The path to the folder of review_data jason files, either a local path or an HDFS path.
* `input_meta`: __Required.__ The path to `meta_Books.csv`, either a local path or an HDFS path.
* `input_transaction`: __Required.__ The path to `reviews_Books.json`, either a local path or an HDFS path.
* `output`: The path to save the preprocessed data to parquet files. HDFS path is recommended.

## Train DIEN
Expand All @@ -70,7 +75,7 @@ python dien_train.py \
--executor_cores 8 \
--executor_memory 50g \
--batch_size 128 \
--data_dir /path/to/the/folder/to/save/preprocessed/parquet_files \
--data_dir /path/to/the/folder/to/save/preprocessed/parquet/files \
--model_dir /path/to/the/folder/to/save/trained/model
```

Expand All @@ -80,10 +85,10 @@ python dien_train.py \
--cluster_mode standalone \
--master spark://master-url:port \
--executor_cores 8 \
--executor_memory 240g \
--num_executor 8 \
--executor_memory 50g \
--num_executors 8 \
--batch_size 128 \
--data_dir /path/to/the/folder/to/save/preprocessed/parquet_files \
--data_dir /path/to/the/folder/to/save/preprocessed/parquet/files \
--model_dir /path/to/the/folder/to/save/trained/model
```

Expand All @@ -92,9 +97,10 @@ python dien_train.py \
python dien_train.py \
--cluster_mode yarn \
--executor_cores 8 \
--executor_memory 240g \
--executor_memory 50g \
--num_executors 8 \
--batch_size 128 \
--data_dir /path/to/the/folder/to/save/preprocessed/parquet_files \
--data_dir /path/to/the/folder/to/save/preprocessed/parquet/files \
--model_dir /path/to/the/folder/to/save/trained/model
```

Expand All @@ -103,7 +109,7 @@ __Options:__
* `master`: The master URL, only used when cluster_mode is standalone.
* `executor_cores`: The number of cores to use on each node. Default to be 48.
* `executor_memory`: The amount of memory to allocate on each node. Default to be 240g.
* `num_nodes`: The number of nodes to use in the cluster. Default to be 40.
* `num_executors`: The number of nodes to use in the cluster. Default to be 40.
* `driver_cores`: The number of cores to use for the driver. Default to be 4.
* `driver_memory`: The amount of memory to allocate for the driver. Default to be 36g.
* `batch_size`: The batch size for training. Default to be 8.
Expand Down
33 changes: 14 additions & 19 deletions python/friesian/example/dien/dien_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

def _parse_args():
parser = ArgumentParser()

parser.add_argument('--cluster_mode', type=str, default="local",
help='The cluster mode, such as local, yarn, standalone or spark-submit.')
parser.add_argument('--master', type=str, default=None,
Expand All @@ -46,21 +45,17 @@ def _parse_args():
help='The executor core number.')
parser.add_argument('--executor_memory', type=str, default="160g",
help='The executor memory.')
parser.add_argument('--num_executor', type=int, default=8,
help='The number of executor.')
parser.add_argument('--num_executors', type=int, default=8,
help='The number of executors.')
parser.add_argument('--driver_cores', type=int, default=4,
help='The driver core number.')
parser.add_argument('--driver_memory', type=str, default="36g",
help='The driver memory.')
parser.add_argument('--input_transaction', type=str,
parser.add_argument('--input_transaction', type=str, required=True,
help="transaction files.")
parser.add_argument('--input_meta', type=str,
parser.add_argument('--input_meta', type=str, required=True,
help="item metadata file")
parser.add_argument('--output', default=".")
parser.add_argument(
'--write_mode',
choices=['overwrite', 'errorifexists'],
default='overwrite')
parser.add_argument('--output', default="./")

args = parser.parse_args()
return args
Expand All @@ -71,13 +66,13 @@ def _parse_args():
init_orca_context("local", cores=args.executor_cores, memory=args.executor_memory)
elif args.cluster_mode == "standalone":
init_orca_context("standalone", master=args.master,
cores=args.executor_cores, num_nodes=args.num_executor,
cores=args.executor_cores, num_nodes=args.num_executors,
memory=args.executor_memory,
driver_cores=args.driver_cores,
driver_memory=args.driver_memory, conf=conf)
elif args.cluster_mode == "yarn":
init_orca_context("yarn-client", cores=args.executor_cores,
num_nodes=args.num_executor, memory=args.executor_memory,
num_nodes=args.num_executors, memory=args.executor_memory,
driver_cores=args.driver_cores, driver_memory=args.driver_memory,
conf=conf)
elif args.cluster_mode == "spark-submit":
Expand All @@ -89,12 +84,12 @@ def _parse_args():
.rename({'reviewerID': 'user', 'asin': 'item', 'unixReviewTime': 'time'}) \
.dropna(columns=['user', 'item'])
transaction_tbl.cache()
print("transaction_tbl, ", transaction_tbl.size())
print("Total number of transactions: ", transaction_tbl.size())

item_tbl = FeatureTable.read_csv(args.input_meta, delimiter="\t", names=['item', 'category'])\
.apply("category", "category", lambda x: x.lower() if x is not None else "default")
item_tbl.cache()
print("item_tbl, ", item_tbl.size())
print("Total number of items: ", item_tbl.size())

user_index = transaction_tbl.gen_string_idx('user', freq_limit=1)
item_cat_indices = item_tbl.gen_string_idx(["item", "category"], freq_limit=1)
Expand All @@ -110,18 +105,18 @@ def _parse_args():
.add_neg_hist_seq(item_size, 'item_hist_seq', neg_num=5)\
.add_negative_samples(item_size, item_col='item', neg_num=1)\
.add_value_features(columns=["item", "item_hist_seq", "neg_item_hist_seq"],
dict_tbl=item_tbl, key="item", value="category")\
dict_tbl=item_tbl, key="item", value="category") \
.apply("item_hist_seq", "item_hist_seq_len", len, "int") \
.pad(cols=['item_hist_seq', 'category_hist_seq',
'neg_item_hist_seq', 'neg_category_hist_seq'],
seq_len=100,
mask_cols=['item_hist_seq']) \
.apply("item_hist_seq", "item_hist_seq_len", len, "int") \
.apply("label", "label", lambda x: [1 - float(x), float(x)], "array<float>")

# write out
user_index.write_parquet(args.output + "user_index")
item_cat_indices[0].write_parquet(args.output + "item_index")
item_cat_indices[1].write_parquet(args.output + "category_index")
user_index.write_parquet(args.output)
item_cat_indices[0].write_parquet(args.output)
item_cat_indices[1].write_parquet(args.output)
item_tbl.write_parquet(args.output + "item2cat")
full_tbl.write_parquet(args.output + "data")

Expand Down
8 changes: 4 additions & 4 deletions python/friesian/example/dien/dien_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def load_dien_data(data_dir):
help='The executor core number.')
parser.add_argument('--executor_memory', type=str, default="160g",
help='The executor memory.')
parser.add_argument('--num_executor', type=int, default=8,
help='The number of executor.')
parser.add_argument('--num_executors', type=int, default=8,
help='The number of executors.')
parser.add_argument('--driver_cores', type=int, default=4,
help='The driver core number.')
parser.add_argument('--driver_memory', type=str, default="36g",
Expand All @@ -128,13 +128,13 @@ def load_dien_data(data_dir):
init_orca_context("local", cores=args.executor_cores, memory=args.executor_memory)
elif args.cluster_mode == "standalone":
init_orca_context("standalone", master=args.master,
cores=args.executor_cores, num_nodes=args.num_executor,
cores=args.executor_cores, num_nodes=args.num_executors,
memory=args.executor_memory,
driver_cores=args.driver_cores, driver_memory=args.driver_memory,
init_ray_on_spark=False)
elif args.cluster_mode == "yarn":
init_orca_context("yarn-client", cores=args.executor_cores,
num_nodes=args.num_executor, memory=args.executor_memory,
num_nodes=args.num_executors, memory=args.executor_memory,
driver_cores=args.driver_cores, driver_memory=args.driver_memory,
init_ray_on_spark=False)
elif args.cluster_mode == "spark-submit":
Expand Down
30 changes: 30 additions & 0 deletions python/friesian/example/dien/meta_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument('--input_meta', type=str, required=True,
help="item metadata file")

args = parser.parse_args()
fi = open(args.input_meta, "r")
out_file = args.input_meta.split(".json")[0] + ".csv"
fo = open(out_file, "w")
for line in fi:
obj = eval(line)
cat = obj["categories"][0][-1]
print(obj["asin"] + "\t" + cat, file=fo)
33 changes: 33 additions & 0 deletions scripts/standalone/sbin/spark-config.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# included in all the spark scripts with source command
# should not be executable directly
# also should not be passed any arguments, since we need original $*

# symlink and absolute path should rely on SPARK_HOME to resolve
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.7-src.zip:${PYTHONPATH}"
export PYSPARK_PYTHONPATH_SET=1
fi
Loading

0 comments on commit eaf5b0c

Please sign in to comment.