-
Notifications
You must be signed in to change notification settings - Fork 0
/
extract_async
79 lines (62 loc) · 2.49 KB
/
extract_async
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import asyncio
import boto3
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def async_textract_processing(s3_paths):
# Initialize Textract client
textract_client = boto3.client('textract')
tasks = []
for s3_path in s3_paths:
# Extract bucket name and object key from the S3 path
bucket_name, object_key = extract_bucket_and_key(s3_path)
# Create an asynchronous task for each document
task = asyncio.create_task(process_document(textract_client, bucket_name, object_key))
tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks)
def extract_bucket_and_key(s3_path):
# Remove the "s3://" prefix
s3_path = s3_path.replace('s3://', '')
# Split the path into bucket name and object key
parts = s3_path.split('/')
bucket_name = parts[0]
object_key = '/'.join(parts[1:])
return bucket_name, object_key
async def process_document(client, bucket_name, object_key):
logger.info(f"Processing document: {object_key}")
# Call your custom call_textract function here
try:
result = call_textract(client, bucket_name, object_key)
logger.info(f"Document {object_key} processed!")
# Handle the result as needed
# You can access the text and other information from the result object.
except Exception as e:
logger.error(f"Error processing document {object_key}: {e}")
def call_textract(client, bucket_name, object_key):
# Your custom code to call Textract and handle the response
response = client.start_document_text_detection(
DocumentLocation={'S3Object': {'Bucket': bucket_name, 'Name': object_key}}
)
job_id = response['JobId']
# Poll for job completion
while True:
response = client.get_document_text_detection(JobId=job_id)
status = response['JobStatus']
if status in ['SUCCEEDED', 'FAILED']:
break
time.sleep(5)
# Handle the result and return it
# You can access the text and other information from the Textract response.
return response
if __name__ == "__main__":
# Specify your list of S3 paths to documents
s3_paths = [
's3://your-bucket-name/path/to/document1.pdf',
's3://your-bucket-name/path/to/document2.pdf',
]
# Create an event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(async_textract_processing(s3_paths))
loop.close()