1
1
import asyncio
2
2
import concurrent .futures
3
- import logging
3
+ from logging import getLogger
4
4
from datetime import datetime , timedelta , timezone
5
5
from typing import Awaitable , Callable , Iterator , Optional , Tuple , TypeVar
6
6
14
14
from blotter .streaming import StreamingID , StreamingManager
15
15
from google .cloud import bigquery
16
16
17
+ logger = getLogger (__name__ )
18
+
17
19
_T = TypeVar ("_T" )
18
20
19
21
@@ -64,7 +66,7 @@ def resume_streaming(self) -> None:
64
66
"""
65
67
66
68
streaming_ids = list (self ._streaming_manager .resume_streaming (self ._ib_thread ))
67
- logging .info (f"Resumed streaming IDs { streaming_ids } " )
69
+ logger .info (f"Resumed streaming IDs { streaming_ids } " )
68
70
69
71
def _run_in_ib_thread (
70
72
self , fn : Callable [[ib_insync .IB ], Awaitable [_T ]]
@@ -87,7 +89,7 @@ def LoadHistoricalData(
87
89
request : blotter_pb2 .LoadHistoricalDataRequest ,
88
90
context : grpc .ServicerContext ,
89
91
) -> Iterator [blotter_pb2 .LoadHistoricalDataResponse ]:
90
- logging .info (f"LoadHistoricalData: { request } " )
92
+ logger .info (f"LoadHistoricalData: { request } " )
91
93
92
94
td = request_helpers .duration_timedelta_atleast (request .duration )
93
95
end_date = datetime .fromtimestamp (request .endTimestampUTC , tz = timezone .utc )
@@ -96,7 +98,7 @@ def LoadHistoricalData(
96
98
duration = request_helpers .duration_str (request .duration )
97
99
start_date = end_date - timedelta (seconds = 1 )
98
100
else :
99
- logging .debug (f"Splitting requested duration { td } " )
101
+ logger .debug (f"Splitting requested duration { td } " )
100
102
duration = request_helpers .duration_str (
101
103
blotter_pb2 .Duration (count = 10 , unit = blotter_pb2 .Duration .TimeUnit .DAYS )
102
104
)
@@ -108,7 +110,7 @@ async def _backfill(
108
110
) -> Tuple [datetime , bigquery .LoadJob ]:
109
111
nonlocal end_date
110
112
111
- logging .info (
113
+ logger .info (
112
114
f"Backfilling { duration } from { end_date } of { request .contractSpecifier } "
113
115
)
114
116
@@ -126,15 +128,15 @@ async def _backfill(
126
128
while end_date > start_date :
127
129
(end_date , job ) = self ._run_in_ib_thread (_backfill ).result ()
128
130
129
- logging .info (f"BigQuery backfill job launched: { job .job_id } " )
131
+ logger .info (f"BigQuery backfill job launched: { job .job_id } " )
130
132
yield blotter_pb2 .LoadHistoricalDataResponse (backfillJobID = job .job_id )
131
133
132
134
def StartRealTimeData (
133
135
self ,
134
136
request : blotter_pb2 .StartRealTimeDataRequest ,
135
137
context : grpc .ServicerContext ,
136
138
) -> blotter_pb2 .StartRealTimeDataResponse :
137
- logging .info (f"StartRealTimeData: { request } " )
139
+ logger .info (f"StartRealTimeData: { request } " )
138
140
139
141
async def _start_stream (ib_client : ib_insync .IB ) -> StreamingID :
140
142
return await self ._streaming_manager .start_stream (
@@ -145,7 +147,7 @@ async def _start_stream(ib_client: ib_insync.IB) -> StreamingID:
145
147
)
146
148
147
149
streaming_id = self ._run_in_ib_thread (_start_stream ).result ()
148
- logging .debug (f"Real-time bars streaming ID: { streaming_id } " )
150
+ logger .debug (f"Real-time bars streaming ID: { streaming_id } " )
149
151
150
152
return blotter_pb2 .StartRealTimeDataResponse (requestID = streaming_id )
151
153
@@ -154,7 +156,7 @@ def CancelRealTimeData(
154
156
request : blotter_pb2 .CancelRealTimeDataRequest ,
155
157
context : grpc .ServicerContext ,
156
158
) -> blotter_pb2 .CancelRealTimeDataResponse :
157
- logging .info (f"CancelRealTimeData: { request } " )
159
+ logger .info (f"CancelRealTimeData: { request } " )
158
160
159
161
async def _cancel_stream (ib_client : ib_insync .IB ) -> None :
160
162
await self ._streaming_manager .cancel_stream (
@@ -167,23 +169,23 @@ async def _cancel_stream(ib_client: ib_insync.IB) -> None:
167
169
def HealthCheck (
168
170
self , request : blotter_pb2 .HealthCheckRequest , context : grpc .ServicerContext ,
169
171
) -> blotter_pb2 .HealthCheckResponse :
170
- logging .info (f"HealthCheck: { request } " )
172
+ logger .info (f"HealthCheck: { request } " )
171
173
return blotter_pb2 .HealthCheckResponse ()
172
174
173
175
def SnapshotOptionChain (
174
176
self ,
175
177
request : blotter_pb2 .SnapshotOptionChainRequest ,
176
178
context : grpc .ServicerContext ,
177
179
) -> blotter_pb2 .SnapshotOptionChainResponse :
178
- logging .info (f"SnapshotOptionChain: { request } " )
180
+ logger .info (f"SnapshotOptionChain: { request } " )
179
181
180
182
async def _snapshot (ib_client : ib_insync .IB ) -> bigquery .LoadJob :
181
183
return await snapshot_options (
182
184
ib_client , request .contractSpecifier , self ._error_handler
183
185
)
184
186
185
187
job = self ._run_in_ib_thread (_snapshot ).result ()
186
- logging .info (f"BigQuery import job launched: { job .job_id } " )
188
+ logger .info (f"BigQuery import job launched: { job .job_id } " )
187
189
188
190
return blotter_pb2 .SnapshotOptionChainResponse (importJobID = job .job_id )
189
191
0 commit comments