Skip to content

Commit

Permalink
chapter 7 images temp
Browse files Browse the repository at this point in the history
  • Loading branch information
theja committed Oct 7, 2021
1 parent c8f7960 commit d2d2a84
Show file tree
Hide file tree
Showing 32 changed files with 15,364 additions and 0 deletions.
567 changes: 567 additions & 0 deletions chapter7/Overview_of_Apache_Kafka.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/externalip.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/externalip1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/kinesis_social_media.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/nb1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/nb2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/nb3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/nb4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/nb5.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14,378 changes: 14,378 additions & 0 deletions chapter7/producer_consumer_example.html

Large diffs are not rendered by default.

90 changes: 90 additions & 0 deletions chapter7/producer_consumer_example.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from kafka import KafkaProducer\n",
"from json import dumps\n",
"import time\n",
"producer = KafkaProducer(bootstrap_servers=['localhost:9092'],\n",
" value_serializer=lambda x: dumps(x).encode('utf-8'))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<kafka.producer.future.FutureRecordMetadata at 0x7f6a9365f9d0>"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data = ['Hello from python', 'Theja from python']\n",
"producer.send('quickstart-events', data)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"from kafka import KafkaConsumer\n",
"from json import loads\n",
"consumer = KafkaConsumer('quickstart-events',\n",
" bootstrap_servers=['localhost:9092'],\n",
" value_deserializer=lambda x: loads(x.decode('utf-8')))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'name': 'Theja'}\n"
]
}
],
"source": [
"for x in consumer:\n",
" print(x.value)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
36 changes: 36 additions & 0 deletions chapter7/producer_consumer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env python
# coding: utf-8

# In[1]:


from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))


# In[3]:


data = ['Hello from python', 'Theja from python']
producer.send('quickstart-events', data)


# In[12]:


from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer('quickstart-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: loads(x.decode('utf-8')))


# In[ ]:


for x in consumer:
print(x.value)

Binary file added chapter7/prop1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/prop2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/prop3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/s101a.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added chapter7/s101b.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
66 changes: 66 additions & 0 deletions chapter7/spark_streaming_producer.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from kafka import KafkaProducer\n",
"from json import dumps\n",
"import time\n",
"producer = KafkaProducer(bootstrap_servers=['155.138.192.245:9092'],\n",
" value_serializer=lambda x: dumps(x).encode('utf-8'))"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<kafka.producer.future.FutureRecordMetadata at 0x112d76650>"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data = {'uid':'196'}\n",
"producer.send('quickstart-events', data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
42 changes: 42 additions & 0 deletions chapter7/streaming101.html

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions chapter7/streaming101.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cells":[{"cell_type":"code","source":["from pyspark.sql.types import StringType\nimport json\nimport pandas as pd\n\ndef recommend(row):\n d = json.loads(row)\n result = {'uid':d['uid'] , 'pred': '' }\n return str(json.dumps(result))"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":1},{"cell_type":"code","source":["df = spark.readStream.format(\"kafka\") \\\n .option(\"kafka.bootstrap.servers\", \"155.138.192.245:9092\") \\\n .option(\"subscribe\", \"quickstart-events\") \\\n .option(\"startingOffsets\", \"latest\").load()"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":2},{"cell_type":"code","source":["df.printSchema()"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\">root\n-- key: binary (nullable = true)\n-- value: binary (nullable = true)\n-- topic: string (nullable = true)\n-- partition: integer (nullable = true)\n-- offset: long (nullable = true)\n-- timestamp: timestamp (nullable = true)\n-- timestampType: integer (nullable = true)\n\n</div>"]}}],"execution_count":3},{"cell_type":"code","source":["df = df.selectExpr(\"CAST(value AS STRING)\")\nrecommend_udf = udf(recommend, StringType())\ndf = df.select(recommend_udf(\"value\").alias(\"value\"))"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":4},{"cell_type":"code","source":["query = df.writeStream.format(\"kafka\")\\\n .option(\"kafka.bootstrap.servers\", \"155.138.192.245:9092\")\\\n .option(\"topic\", \"recommendation-events\")\\\n .option(\"checkpointLocation\", \"/temp\").start().awaitTermination()"],"metadata":{},"outputs":[],"execution_count":5}],"metadata":{"name":"streaming101","notebookId":2246108946536934},"nbformat":4,"nbformat_minor":0}
33 changes: 33 additions & 0 deletions chapter7/streaming101.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Databricks notebook source
from pyspark.sql.types import StringType
import json
import pandas as pd

def recommend(row):
d = json.loads(row)
result = {'uid':d['uid'] , 'pred': '' }
return str(json.dumps(result))

# COMMAND ----------

df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "155.138.192.245:9092") \
.option("subscribe", "quickstart-events") \
.option("startingOffsets", "latest").load()

# COMMAND ----------

df.printSchema()

# COMMAND ----------

df = df.selectExpr("CAST(value AS STRING)")
recommend_udf = udf(recommend, StringType())
df = df.select(recommend_udf("value").alias("value"))

# COMMAND ----------

query = df.writeStream.format("kafka")\
.option("kafka.bootstrap.servers", "155.138.192.245:9092")\
.option("topic", "recommendation-events")\
.option("checkpointLocation", "/temp").start().awaitTermination()
42 changes: 42 additions & 0 deletions chapter7/streaming_recs.html

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions chapter7/streaming_recs.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cells":[{"cell_type":"markdown","source":["In Cmd 2, the AWS_ACCESS_KEY and AWS_SECRET_KEY variables are set and kept hidden."],"metadata":{}},{"cell_type":"code","source":["AWS_ACCESS_KEY = \"notsecret\"\nAWS_SECRET_KEY = \"secret\""],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":2},{"cell_type":"code","source":["sc._jsc.hadoopConfiguration().set(\"fs.s3n.awsAccessKeyId\", AWS_ACCESS_KEY)\nsc._jsc.hadoopConfiguration().set(\"fs.s3n.awsSecretAccessKey\", AWS_SECRET_KEY)"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":3},{"cell_type":"code","source":["df = spark.read.csv(\"s3://databricks-recsys/u.data\",header=True, sep=\"\\t\",inferSchema = True)\npdf = df.toPandas()"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":4},{"cell_type":"code","source":["!pip install scikit-surprise"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":5},{"cell_type":"code","source":["# https://github.com/NicolasHug/Surprise\n#https://github.com/NicolasHug/Surprise/blob/master/examples/top_n_recommendations.py\nfrom surprise import SVD, Dataset, Reader\nfrom surprise.accuracy import rmse\nfrom collections import defaultdict"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":6},{"cell_type":"code","source":["def get_top_n(predictions, n=10):\n \"\"\"Return the top-N recommendation for each user from a set of predictions.\n Args:\n predictions(list of Prediction objects): The list of predictions, as\n returned by the test method of an algorithm.\n n(int): The number of recommendation to output for each user. Default\n is 10.\n Returns:\n A dict where keys are user (raw) ids and values are lists of tuples:\n [(raw item id, rating estimation), ...] of size n.\n \"\"\"\n\n # First map the predictions to each user.\n top_n = defaultdict(list)\n for uid, iid, true_r, est, _ in predictions:\n top_n[uid].append((iid, est))\n\n # Then sort the predictions for each user and retrieve the k highest ones.\n for uid, user_ratings in top_n.items():\n user_ratings.sort(key=lambda x: x[1], reverse=True)\n top_n[uid] = user_ratings[:n]\n\n return top_n"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":7},{"cell_type":"code","source":["# A reader is still needed but only the rating_scale param is requiered.\nreader = Reader(rating_scale=(1, 5))\n\n# The columns must correspond to user id, item id and ratings (in that order).\ndata = Dataset.load_from_df(pdf[['uid', 'iid', 'rating']], reader)"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":8},{"cell_type":"code","source":["# Load the movielens-100k dataset (download it if needed).\ntrainset = data.build_full_trainset()\n\n# Use an example algorithm: SVD.\nalgo = SVD()\nalgo.fit(trainset) "],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\">Out[8]: &lt;surprise.prediction_algorithms.matrix_factorization.SVD at 0x7f0dac0af9d0&gt;</div>"]}}],"execution_count":9},{"cell_type":"code","source":["#actual predictions as thse items have not been seen by the users. there is no ground truth. \n# We predict ratings for all pairs (u, i) that are NOT in the training set.\ntestset = trainset.build_anti_testset()\npredictions = algo.test(testset)\ntop_n = get_top_n(predictions, n=10)"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":10},{"cell_type":"code","source":["from pyspark.sql.types import StringType\nimport json\nimport pandas as pd\n\ndef recommend(row):\n d = json.loads(row)\n result = {'uid':d['uid'] , 'pred': [x[0] for x in top_n[int(d['uid'])]] }\n return str(json.dumps(result))"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":11},{"cell_type":"code","source":["df = spark.readStream.format(\"kafka\") \\\n .option(\"kafka.bootstrap.servers\", \"155.138.192.245:9092\") \\\n .option(\"subscribe\", \"quickstart-events\") \\\n .option(\"startingOffsets\", \"latest\").load()\ndf = df.selectExpr(\"CAST(value AS STRING)\")\nrecommend_udf = udf(recommend, StringType())\ndf = df.select(recommend_udf(\"value\").alias(\"value\"))"],"metadata":{},"outputs":[{"output_type":"display_data","metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":12},{"cell_type":"code","source":["query = df.writeStream.format(\"kafka\")\\\n .option(\"kafka.bootstrap.servers\", \"155.138.192.245:9092\")\\\n .option(\"topic\", \"recommendation-events\")\\\n .option(\"checkpointLocation\", \"/temp\").start().awaitTermination()"],"metadata":{},"outputs":[],"execution_count":13}],"metadata":{"name":"streaming_recs","notebookId":2246108946536949},"nbformat":4,"nbformat_minor":0}
Loading

0 comments on commit d2d2a84

Please sign in to comment.