-
Notifications
You must be signed in to change notification settings - Fork 46
/
Copy pathupload_tutorials.py
45 lines (38 loc) · 1.22 KB
/
upload_tutorials.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
import time
import os
import typesense
import json
async def main():
typesense_client = typesense.Client({
'api_key': os.environ['TYPESENSE_TUTORIALS_API_KEY'],
'nodes': [{
'host': 'cgnvrk0xwyj9576lp-1.a1.typesense.net',
'port': '443',
'protocol': 'https'
}],
'connection_timeout_seconds': 2
})
time_now = int(time.time())
# Get tutorials from tutorials/typesense.json
with open('tutorials/typesense.json') as f:
resources = json.load(f)
for r in resources:
print("RESOURCE")
r["date"] = int(r["date"])
print(r)
r["last_updated"] = time_now
print(r)
insert_resp = typesense_client.collections['tutorials'].documents.upsert(r)
print("INSERTED")
print(insert_resp)
# Deleting documents that didn't get updated (presumably deleted)
try:
res = typesense_client.collections['tutorials'].documents.delete({'filter_by': 'last_updated: <' + str(time_now)})
print("Resources deleted")
print(res)
except Exception as e:
print(e)
pass
if __name__ == '__main__':
asyncio.run(main())