Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

notebook-example #631

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 321 additions & 0 deletions examples/notebook/Nats_notebook.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "9986b247",
"metadata": {},
"source": [
"# Nats notebook"
]
},
{
"cell_type": "markdown",
"id": "aa5e9921",
"metadata": {},
"source": [
"Follow instructions from https://docs.nats.io/running-a-nats-service/nats_docker to set up the Nats server.\n",
"\n",
"Create a network\n",
"```\n",
"docker network create nats\n",
"```\n",
"Set up a server\n",
"```\n",
"docker run -p 4222:4222 -p 8222:8222 -p 6222:6222 --network nats --name nats-server -ti nats:<version> -js\n",
"```\n",
"Add your Jupyter docker container to the network. Instead of '<your_jupyter_container_name>>, you need to provide the actual name of your Jupyter notebook container. You can find it using docker ps command.\n",
"```\n",
"docker network connect nats <your_jupyter_container_name>\n",
"```\n",
"Install Python client on your notebook container\n",
"```\n",
"pip install nats-py\n",
"```\n",
"Follow examples from https://github.com/nats-io/nats.py/tree/main"
]
},
{
"cell_type": "markdown",
"id": "2492c7e7",
"metadata": {},
"source": [
"## Getting started example"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "2f42a4da",
"metadata": {
"ExecuteTime": {
"end_time": "2024-11-09T15:05:50.265069Z",
"start_time": "2024-11-09T15:05:50.156794Z"
}
},
"outputs": [],
"source": [
"from nats import NATS\n",
"import asyncio\n",
"import nats\n",
"from nats.errors import ConnectionClosedError, TimeoutError, NoServersError"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "1467a4b8",
"metadata": {
"ExecuteTime": {
"end_time": "2024-11-09T15:05:50.998769Z",
"start_time": "2024-11-09T15:05:50.975109Z"
}
},
"outputs": [],
"source": [
"async def main():\n",
" # It is very likely that the demo server will see traffic from clients other than yours.\n",
" # To avoid this, start your own locally and modify the example to use it. \n",
" # Use your server container name\n",
" nc = await nats.connect(\"nats://nats-server:4222\")\n",
"\n",
" # You can also use the following for TLS against the demo server.\n",
" #\n",
" # nc = await nats.connect(\"tls://demo.nats.io:4443\")\n",
"\n",
" async def message_handler(msg):\n",
" subject = msg.subject\n",
" reply = msg.reply\n",
" data = msg.data.decode()\n",
" print(\"Received a message on '{subject} {reply}': {data}\".format(\n",
" subject=subject, reply=reply, data=data))\n",
"\n",
" # Simple publisher and async subscriber via coroutine.\n",
" sub = await nc.subscribe(\"foo\", cb=message_handler)\n",
"\n",
" # Stop receiving after 2 messages.\n",
" await sub.unsubscribe(limit=2)\n",
" await nc.publish(\"foo\", b'Hello')\n",
" await nc.publish(\"foo\", b'World')\n",
" await nc.publish(\"foo\", b'!!!!!')\n",
"\n",
" # Synchronous style with iterator also supported.\n",
" sub = await nc.subscribe(\"bar\")\n",
" await nc.publish(\"bar\", b'First')\n",
" await nc.publish(\"bar\", b'Second')\n",
"\n",
" try:\n",
" async for msg in sub.messages:\n",
" print(f\"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}\")\n",
" await sub.unsubscribe()\n",
" except Exception as e:\n",
" pass\n",
"\n",
" async def help_request(msg):\n",
" print(f\"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}\")\n",
" await nc.publish(msg.reply, b'I can help')\n",
"\n",
" # Use queue named 'workers' for distributing requests\n",
" # among subscribers.\n",
" sub = await nc.subscribe(\"help\", \"workers\", help_request)\n",
"\n",
" # Send a request and expect a single response\n",
" # and trigger timeout if not faster than 500 ms.\n",
" try:\n",
" response = await nc.request(\"help\", b'help me', timeout=0.5)\n",
" print(\"Received response: {message}\".format(\n",
" message=response.data.decode()))\n",
" except TimeoutError:\n",
" print(\"Request timed out\")\n",
"\n",
" # Remove interest in subscription.\n",
" await sub.unsubscribe()\n",
"\n",
" # Terminate connection to NATS.\n",
" await nc.drain()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "4e63ae76",
"metadata": {
"ExecuteTime": {
"end_time": "2024-11-09T15:05:53.633998Z",
"start_time": "2024-11-09T15:05:53.626039Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Received a message on 'foo ': Hello\n",
"Received a message on 'foo ': World\n",
"Received a message on 'bar ': First\n",
"Received a message on 'bar ': Second\n",
"Received a message on 'help _INBOX.jldI3AbJSaeL1hTrWR1pbM.jldI3AbJSaeL1hTrWR1peka617': help me\n",
"Received response: I can help\n"
]
}
],
"source": [
"await main()"
]
},
{
"cell_type": "markdown",
"id": "113c71ca",
"metadata": {},
"source": [
"## JetStream and key-value example"
]
},
{
"cell_type": "markdown",
"id": "5f141165",
"metadata": {
"ExecuteTime": {
"end_time": "2024-11-09T15:08:15.913166Z",
"start_time": "2024-11-09T15:08:15.899846Z"
}
},
"source": [
"See \n",
"https://github.com/nats-io/nats.py/blob/main/examples/kv.py"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "c77717b7",
"metadata": {
"ExecuteTime": {
"end_time": "2024-11-09T15:08:59.879427Z",
"start_time": "2024-11-09T15:08:59.872731Z"
}
},
"outputs": [],
"source": [
"async def kv_main():\n",
" nc = await nats.connect(\"nats://nats-server:4222\")\n",
" js = nc.jetstream()\n",
"\n",
" # Create a KV\n",
" kv = await js.create_key_value(bucket=\"MY_KV\")\n",
"\n",
" # Set and retrieve a value\n",
" await kv.put(\"hello\", b\"world\")\n",
" await kv.put(\"goodbye\", b\"farewell\")\n",
" await kv.put(\"greetings\", b\"hi\")\n",
" await kv.put(\"greeting\", b\"hey\")\n",
"\n",
" # Retrieve and print the value of 'hello'\n",
" entry = await kv.get(\"hello\")\n",
" print(f\"KeyValue.Entry: key={entry.key}, value={entry.value}\")\n",
" # KeyValue.Entry: key=hello, value=world\n",
"\n",
" # Retrieve keys with filters\n",
" filtered_keys = await kv.keys(filters=[\"hello\", \"greet\"])\n",
" print(f\"Filtered Keys: {filtered_keys}\")\n",
" # Expected Output: ['hello', 'greetings']\n",
"\n",
" await nc.close()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "c2437c2b",
"metadata": {
"ExecuteTime": {
"end_time": "2024-11-09T15:09:16.349726Z",
"start_time": "2024-11-09T15:09:16.338173Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"KeyValue.Entry: key=hello, value=b'world'\n",
"Filtered Keys: ['hello', 'goodbye', 'greetings', 'greeting']\n"
]
}
],
"source": [
"await kv_main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d48f2dd3",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.10"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading