Skip to content

Commit

Permalink
Perftune (#180)
Browse files Browse the repository at this point in the history
* Load redis data once and only once per request

* Cache all redis data into the local instance

* Remove warm_caches function as it does nothing

* Added more verbose logging with performance profiling data

* added redis read timer to logging
  • Loading branch information
Victor Ng authored Sep 2, 2020
1 parent b33a1b6 commit 4badf3f
Show file tree
Hide file tree
Showing 13 changed files with 217 additions and 138 deletions.
8 changes: 3 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ up:
-e WORKERS=1 \
-e THREADS=2 \
-e LOG_LEVEL=20 \
-e GOOGLE_APPLICATION_CREDENTIALS=/app/.gcp_creds/vng-taar-dev-clientinfo-svc.json \
-e GOOGLE_APPLICATION_CREDENTIALS=/app/.gcp_creds/vng-taar-stage.json \
-e TAAR_API_PLUGIN=taar.plugin \
-e TAAR_ITEM_MATRIX_BUCKET=telemetry-public-analysis-2 \
-e TAAR_ITEM_MATRIX_KEY=telemetry-ml/addon_recommender/item_matrix.json \
Expand All @@ -49,10 +49,8 @@ up:
-e TAARLITE_MAX_RESULTS=4 \
-e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
-e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
-e BIGTABLE_PROJECT_ID=${BIGTABLE_PROJECT_ID} \
-e BIGTABLE_INSTANCE_ID=${BIGTABLE_INSTANCE_ID} \
-e BIGTABLE_TABLE_ID=${BIGTABLE_TABLE_ID} \
-e GCLOUD_PROJECT=${GCLOUD_PROJECT} \
-e BIGTABLE_PROJECT_ID=moz-fx-data-taar-nonprod-48b6 \
-e BIGTABLE_INSTANCE_ID=taar-stage-202006 \
-p 8000:8000 \
-it taar:latest

Expand Down
12 changes: 0 additions & 12 deletions taar/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,6 @@ def acquire_taar_singleton(PROXY_MANAGER):
return PROXY_MANAGER.getTaarRM()


def warm_caches():
import sys

if "pytest" in sys.modules:
# Don't clobber the taarlite singleton under test
return

global PROXY_MANAGER
acquire_taarlite_singleton(PROXY_MANAGER)


class ResourceProxy(object):
def __init__(self):
self._resource = None
Expand Down Expand Up @@ -265,5 +254,4 @@ def set(self, config_options):
if "PROXY_RESOURCE" in config_options:
PROXY_MANAGER._resource = config_options["PROXY_RESOURCE"]

warm_caches()
return MyPlugin()
15 changes: 9 additions & 6 deletions taar/profile_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from decouple import config
from srgutil.interfaces import IMozLogging
from google.cloud import bigtable
from google.cloud.bigtable import column_family
from google.cloud.bigtable import row_filters
import json
import zlib
import datetime

BIGTABLE_PROJECT_ID = config(
"BIGTABLE_PROJECT_ID", default="cfr-personalization-experiment"
from taar.settings import (
BIGTABLE_PROJECT_ID,
BIGTABLE_INSTANCE_ID,
BIGTABLE_TABLE_ID,
)
BIGTABLE_INSTANCE_ID = config("BIGTABLE_INSTANCE_ID", default="taar-profile")
BIGTABLE_TABLE_ID = config("BIGTABLE_TABLE_ID", default="test-table")
import markus


metrics = markus.get_metrics("taar")


class BigTableProfileController:
Expand Down Expand Up @@ -109,6 +111,7 @@ def _client(self):
def set_client(self, client):
self.__client = client

@metrics.timer_decorator("bigtable_read")
def get(self, client_id):
try:
profile_data = self._client.get_client_profile(client_id)
Expand Down
40 changes: 18 additions & 22 deletions taar/recommenders/collaborative_recommender.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,27 @@ def __init__(self, ctx):

self._redis_cache = TAARCache.get_instance(self._ctx)

self.model = None
def _get_cache(self, extra_data):
tmp = extra_data.get("cache", None)
if tmp is None:
tmp = self._redis_cache.cache_context()
return tmp

@property
def addon_mapping(self):
return self._redis_cache.collab_addon_mapping()

@property
def raw_item_matrix(self):
val = self._redis_cache.collab_raw_item_matrix()
if val not in (None, ""):
# Build a dense numpy matrix out of it.
num_rows = len(val)
num_cols = len(val[0]["features"])

self.model = np.zeros(shape=(num_rows, num_cols))
for index, row in enumerate(val):
self.model[index, :] = row["features"]
else:
self.model = None
return val
return self._redis_cache.collab_raw_item_matrix()

def can_recommend(self, client_data, extra_data={}):
cache = self._get_cache(extra_data)
# We can't recommend if we don't have our data files.
if (
self.raw_item_matrix is None
or self.model is None
or self.addon_mapping is None
cache["raw_item_matrix"] is None
or cache["collab_model"] is None
or cache["addon_mapping"] is None
):
return False

Expand All @@ -80,6 +74,8 @@ def can_recommend(self, client_data, extra_data={}):
return False

def _recommend(self, client_data, limit, extra_data):
cache = self._get_cache(extra_data)

installed_addons_as_hashes = [
positive_hash(addon_id)
for addon_id in client_data.get("installed_addons", [])
Expand All @@ -90,35 +86,35 @@ def _recommend(self, client_data, limit, extra_data):
query_vector = np.array(
[
1.0 if (entry.get("id") in installed_addons_as_hashes) else 0.0
for entry in self.raw_item_matrix
for entry in cache["raw_item_matrix"]
]
)

# Build the user factors matrix.
user_factors = np.matmul(query_vector, self.model)
user_factors = np.matmul(query_vector, cache["collab_model"])
user_factors_transposed = np.transpose(user_factors)

# Compute the distance between the user and all the addons in the latent
# space.
distances = {}
for addon in self.raw_item_matrix:
for addon in cache["raw_item_matrix"]:
# We don't really need to show the items we requested.
# They will always end up with the greatest score. Also
# filter out legacy addons from the suggestions.
hashed_id = addon.get("id")
str_hashed_id = str(hashed_id)
if (
hashed_id in installed_addons_as_hashes
or str_hashed_id not in self.addon_mapping
or self.addon_mapping[str_hashed_id].get("isWebextension", False)
or str_hashed_id not in cache["addon_mapping"]
or cache["addon_mapping"][str_hashed_id].get("isWebextension", False)
is False
):
continue

dist = np.dot(user_factors_transposed, addon.get("features"))
# Read the addon ids from the "addon_mapping" looking it
# up by 'id' (which is an hashed value).
addon_id = self.addon_mapping[str_hashed_id].get("id")
addon_id = cache["addon_mapping"][str_hashed_id].get("id")
distances[addon_id] = dist

# Sort the suggested addons by their score and return the
Expand Down
26 changes: 26 additions & 0 deletions taar/recommenders/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.

import contextlib
import time


@contextlib.contextmanager
def log_timer_debug(msg, logger):
start_time = time.time()
try:
yield
finally:
end_time = time.time()
logger.debug(msg + f" Completed in {end_time-start_time} seconds")


@contextlib.contextmanager
def log_timer_info(msg, logger):
start_time = time.time()
try:
yield
finally:
end_time = time.time()
logger.info(msg + f" Completed in {end_time-start_time} seconds")
36 changes: 22 additions & 14 deletions taar/recommenders/ensemble_recommender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from taar.utils import hasher
from taar.recommenders.redis_cache import TAARCache

from taar.recommenders.debug import log_timer_info
import markus

metrics = markus.get_metrics("taar")
Expand Down Expand Up @@ -39,6 +39,12 @@ def __init__(self, ctx):

self._init_from_ctx()

def _get_cache(self, extra_data):
tmp = extra_data.get("cache", None)
if tmp is None:
tmp = self._redis_cache.cache_context()
return tmp

def getWeights(self):
return self._redis_cache.ensemble_weights()

Expand Down Expand Up @@ -66,10 +72,11 @@ def can_recommend(self, client_data, extra_data={}):

@metrics.timer_decorator("ensemble_recommend")
def recommend(self, client_data, limit, extra_data={}):
cache = self._get_cache(extra_data)
client_id = client_data.get("client_id", "no-client-id")

if is_test_client(client_id):
whitelist = self._redis_cache.whitelist_data()
whitelist = cache["whitelist"]
samples = whitelist[:limit]
self.logger.info("Test ID detected [{}]".format(client_id))

Expand Down Expand Up @@ -102,6 +109,7 @@ def _recommend(self, client_data, limit, extra_data={}):
weight each recommender appropriate so that the ordering is
correct.
"""
cache = self._get_cache(extra_data)
self.logger.info("Ensemble recommend invoked")
preinstalled_addon_ids = client_data.get("installed_addons", [])

Expand All @@ -110,20 +118,20 @@ def _recommend(self, client_data, limit, extra_data={}):
extended_limit = limit + len(preinstalled_addon_ids)

flattened_results = []
ensemble_weights = self._redis_cache.ensemble_weights()
ensemble_weights = cache["ensemble_weights"]

for rkey in self.RECOMMENDER_KEYS:
recommender = self._recommender_map[rkey]

if recommender.can_recommend(client_data):
raw_results = recommender.recommend(
client_data, extended_limit, extra_data
)
reweighted_results = []
for guid, weight in raw_results:
item = (guid, weight * ensemble_weights[rkey])
reweighted_results.append(item)
flattened_results.extend(reweighted_results)
with log_timer_info(f"{rkey} recommend invoked", self.logger):
recommender = self._recommender_map[rkey]
if recommender.can_recommend(client_data, extra_data):
raw_results = recommender.recommend(
client_data, extended_limit, extra_data
)
reweighted_results = []
for guid, weight in raw_results:
item = (guid, weight * ensemble_weights[rkey])
reweighted_results.append(item)
flattened_results.extend(reweighted_results)

# Sort the results by the GUID
flattened_results.sort(key=lambda item: item[0])
Expand Down
25 changes: 1 addition & 24 deletions taar/recommenders/guid_based_recommender.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.

from contextlib import contextmanager

import time

from srgutil.interfaces import IMozLogging

import markus

from taar.recommenders.redis_cache import TAARCache

from taar.recommenders.debug import log_timer_debug

metrics = markus.get_metrics("taar")

Expand All @@ -22,26 +19,6 @@
NORM_MODE_GUIDCEPTION = "guidception"


@contextmanager
def log_timer_debug(msg, logger):
start_time = time.time()
try:
yield
finally:
end_time = time.time()
logger.debug(msg + f" Completed in {end_time-start_time} seconds")


@contextmanager
def log_timer_info(msg, logger):
start_time = time.time()
try:
yield
finally:
end_time = time.time()
logger.info(msg + f" Completed in {end_time-start_time} seconds")


class GuidBasedRecommender:
""" A recommender class that returns top N addons based on a
passed addon identifier. This will load a json file containing
Expand Down
19 changes: 13 additions & 6 deletions taar/recommenders/locale_recommender.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@ def __init__(self, ctx):

self._redis_cache = TAARCache.get_instance(self._ctx)

# DONE removed
def _get_cache(self, extra_data):
tmp = extra_data.get("cache", None)
if tmp is None:
tmp = self._redis_cache.cache_context()
return tmp

@property
def top_addons_per_locale(self):
return self._redis_cache.top_addons_per_locale()

def can_recommend(self, client_data, extra_data={}):
cache = self._get_cache(extra_data)

# We can't recommend if we don't have our data files.
if self.top_addons_per_locale is None:
if cache["top_addons_per_locale"] is None:
return False

# If we have data coming from other sources, we can use that for
Expand All @@ -48,10 +55,10 @@ def can_recommend(self, client_data, extra_data={}):
if not isinstance(client_locale, str):
return False

if client_locale not in self.top_addons_per_locale:
if client_locale not in cache["top_addons_per_locale"]:
return False

if not self.top_addons_per_locale.get(client_locale):
if not cache["top_addons_per_locale"].get(client_locale):
return False

return True
Expand All @@ -62,7 +69,6 @@ def recommend(self, client_data, limit, extra_data={}):
result_list = self._recommend(client_data, limit, extra_data)
except Exception as e:
result_list = []
self._top_addons_per_locale.force_expiry()
metrics.incr("error_locale", value=1)
self.logger.exception(
"Locale recommender crashed for {}".format(
Expand All @@ -74,10 +80,11 @@ def recommend(self, client_data, limit, extra_data={}):
return result_list

def _recommend(self, client_data, limit, extra_data={}):
cache = self._get_cache(extra_data)
# If we have data coming from multiple sourecs, prefer the one
# from 'client_data'.
client_locale = client_data.get("locale") or extra_data.get("locale", None)
result_list = self.top_addons_per_locale.get(client_locale, [])[:limit]
result_list = cache["top_addons_per_locale"].get(client_locale, [])[:limit]

if "locale" not in client_data:
try:
Expand Down
Loading

0 comments on commit 4badf3f

Please sign in to comment.