diff --git a/src/book.py b/src/book.py index 51601ed4..2e672894 100644 --- a/src/book.py +++ b/src/book.py @@ -19,8 +19,9 @@ import decimal import logging import re +from collections import defaultdict from pathlib import Path -from typing import Optional +from typing import Any, Optional import config import misc @@ -32,6 +33,13 @@ class Book: + # Need to track state of duplicate deposit/withdrawal entries + # All deposits/withdrawals are held back until they occur a second time + # Initialize non-existing fields with None once they're called + kraken_held_ops: defaultdict[str, defaultdict[str, Any]] = defaultdict( + lambda: defaultdict(lambda: None) + ) + def __init__(self, price_data: PriceData) -> None: self.price_data = price_data @@ -40,7 +48,7 @@ def __init__(self, price_data: PriceData) -> None: def __bool__(self) -> bool: return bool(self.operations) - def append_operation( + def create_operation( self, operation: str, utc_time: datetime.datetime, @@ -49,22 +57,53 @@ def append_operation( coin: str, row: int, file_path: Path, - ) -> None: + ) -> tr.Operation: try: Op = getattr(tr, operation) except AttributeError: - log.warning( + log.error( "Could not recognize operation `%s` in %s file `%s:%i`.", operation, platform, file_path, row, ) - return + raise RuntimeError - o = Op(utc_time, platform, change, coin, row, file_path) - self.operations.append(o) + op = Op(utc_time, platform, change, coin, row, file_path) + assert isinstance(op, tr.Operation) + return op + + def _append_operation( + self, + operation: tr.Operation, + ) -> None: + + self.operations.append(operation) + + def append_operation( + self, + operation: str, + utc_time: datetime.datetime, + platform: str, + change: decimal.Decimal, + coin: str, + row: int, + file_path: Path, + ) -> None: + + op = self.create_operation( + operation, + utc_time, + platform, + change, + coin, + row, + file_path, + ) + + self._append_operation(op) def _read_binance(self, file_path: Path, version: int = 1) -> None: platform = "binance" @@ -452,31 +491,12 @@ def _read_kraken_ledgers(self, file_path: Path) -> None: operation_mapping = { "spend": "Sell", # Sell ordered via 'Buy Crypto' button "receive": "Buy", # Buy ordered via 'Buy Crypto' button - "transfer": "Airdrop", "reward": "StakingInterest", + "staking": "StakingInterest", "deposit": "Deposit", "withdrawal": "Withdrawal", } - # Need to track state of "duplicate entries" - # for deposits / withdrawals; - # the second deposit and the first withdrawal entry - # need to be skipped. - # dup_state["deposit"] == 0: - # Deposit is broadcast to blockchain - # > Taxable event (is in public trade history) - # dup_state["deposit"] == 1: - # Deposit is credited to Kraken account - # > Skipped - # dup_state["withdrawal"] == 0: - # Withdrawal is requested in Kraken account - # > Skipped - # dup_state["withdrawal"] == 1: - # Withdrawal is broadcast to blockchain - # > Taxable event (is in public trade history) - dup_state = {"deposit": 0, "withdrawal": 0} - dup_skip = {"deposit": 1, "withdrawal": 0} - with open(file_path, encoding="utf8") as f: reader = csv.reader(f) @@ -515,42 +535,58 @@ def _read_kraken_ledgers(self, file_path: Path) -> None: balance, ) = columns else: - raise RuntimeError( - "Unknown Kraken ledgers format: " + log.error( + "{file_path}: Unknown Kraken ledgers format: " "Number of rows do not match known versions." ) + raise RuntimeError row = reader.line_num - # Skip "duplicate entries" for deposits / withdrawals - if _type in dup_state.keys(): - skip = dup_state[_type] == dup_skip[_type] - dup_state[_type] = (dup_state[_type] + 1) % 2 - if skip: - continue - # Parse data. utc_time = datetime.datetime.strptime(_utc_time, "%Y-%m-%d %H:%M:%S") utc_time = utc_time.replace(tzinfo=datetime.timezone.utc) change = misc.force_decimal(_amount) + # remove the appended .S for staked assets + _asset = _asset.removesuffix(".S") coin = kraken_asset_map.get(_asset, _asset) fee = misc.force_decimal(_fee) operation = operation_mapping.get(_type) if operation is None: if _type == "trade": operation = "Sell" if change < 0 else "Buy" - elif _type in ["margin trade", "rollover", "settled"]: + elif _type in ["margin trade", "rollover", "settled", "margin"]: log.error( - f"{file_path}: {row}: Margin trading is " - "currently not supported. " - "Please create an Issue or PR." + f"{file_path} row {row}: Margin trading is currently not " + "supported. Please create an Issue or PR." ) raise RuntimeError + elif _type == "transfer": + if num_columns == 9: + # for backwards compatibility assume Airdrop for staking + log.warning( + f"{file_path} row {row}: Staking is not supported for" + "old Kraken ledger formats. " + "Please create an Issue or PR." + ) + operation = "Airdrop" + elif subtype == "stakingfromspot": + operation = "Staking" + elif subtype == "stakingtospot": + operation = "StakingEnd" + elif subtype in ["spottostaking", "spotfromstaking"]: + # duplicate entries for staking actions + continue + else: + log.error( + f"{file_path} row {row}: Order subtype '{subtype}' is " + "currently not supported. Please create an Issue or PR." + ) + raise RuntimeError else: log.error( - f"{file_path}: {row}: Other order type '{_type}' " - "is currently not supported. " - "Please create an Issue or PR." + f"{file_path} row {row}: Other order type '{_type}' is " + "currently not supported. Please create an Issue or PR." ) raise RuntimeError change = abs(change) @@ -560,22 +596,109 @@ def _read_kraken_ledgers(self, file_path: Path) -> None: assert coin assert change - self.append_operation( - operation, utc_time, platform, change, coin, row, file_path - ) + # Skip duplicate entries for deposits / withdrawals and additional + # deposit / withdrawal lines for staking / unstaking / staking reward + # actions. + # The second deposit and the first withdrawal need to be considered, + # since these are the points in time where the user actually has the + # assets at their disposal. The first deposit and second withdrawal are + # in the public trade history and are skipped. + # For staking / unstaking / staking reward actions, deposits / + # withdrawals only occur once and will be ignored. + # The "appended" flag stores if an operation for a given refid has + # already been appended to the operations list: + # == None: Initial value (first occurrence) + # == False: No operation has been appended (second occurrence) + # == True: Operation has already been appended, this should not happen + if operation in ["Deposit", "Withdrawal"]: + # First, create the operations + op = self.create_operation( + operation, utc_time, platform, change, coin, row, file_path + ) + op_fee = None + if fee != 0: + op_fee = self.create_operation( + "Fee", utc_time, platform, fee, coin, row, file_path + ) + # If this is the first occurrence, set the "appended" flag to false + # and don't append the operation to the list. Instead, store the + # data for verifying or appending it later. + if self.kraken_held_ops[refid]["appended"] is None: + self.kraken_held_ops[refid]["appended"] = False + self.kraken_held_ops[refid]["operation"] = op + self.kraken_held_ops[refid]["operation_fee"] = op_fee + # If this is the second occurrence, append a new operation, set the + # "appended" flag to True and assert that the data of this operation + # agrees with the data of the first occurrence. + elif self.kraken_held_ops[refid]["appended"] is False: + self.kraken_held_ops[refid]["appended"] = True + try: + assert isinstance( + op, type(self.kraken_held_ops[refid]["operation"]) + ), "operation" + assert ( + op.change + == self.kraken_held_ops[refid]["operation"].change + ), "change" + assert ( + op.coin == self.kraken_held_ops[refid]["operation"].coin + ), "coin" + except AssertionError as e: + log.error( + f"{file_path} row {row}: Parameters for refid {refid} " + f"({operation}) do not agree: {e}. " + "Please create an Issue or PR." + ) + raise RuntimeError + # For deposits, this is all we need to do before appending the + # operation. For withdrawals, we need to append the first + # withdrawal as soon as the second withdrawal occurs. Therefore, + # overwrite the operation with the stored first withdrawal. + if operation == "Withdrawal": + op = self.kraken_held_ops[refid]["operation"] + op_fee = self.kraken_held_ops[refid]["operation_fee"] + # Finally, append the operations and delete the stored + # operations to reduce memory consumption + self._append_operation(op) + if op_fee: + self._append_operation(op_fee) + del self.kraken_held_ops[refid]["operation"] + del self.kraken_held_ops[refid]["operation_fee"] + # If an operation with the same refid has been already appended, + # this is the third occurrence. Throw an error if this happens. + elif self.kraken_held_ops[refid]["appended"] is True: + log.error( + f"{file_path} row {row}: More than two entries with refid " + f"{refid} should not exist ({operation}). " + "Please create an Issue or PR." + ) + raise RuntimeError + # This should never happen + else: + log.error( + f"{file_path} row {row}: Unknown value for appended " + f"operation flag {self.kraken_held_ops[refid]['appended']}." + "Please create an Issue or PR." + ) + raise TypeError - if fee != 0: + # for all other operation types + else: self.append_operation( - "Fee", utc_time, platform, fee, coin, row, file_path + operation, utc_time, platform, change, coin, row, file_path ) - - assert dup_state["deposit"] == 0, ( - "Orphaned deposit. (Must always come in pairs). " "Is your file corrupted?" - ) - assert dup_state["withdrawal"] == 0, ( - "Orphaned withdrawal. (Must always come in pairs). " - "Is your file corrupted?" - ) + if fee != 0: + self.append_operation( + "Fee", utc_time, platform, fee, coin, row, file_path + ) + if operation == "StakingInterest": + # For Kraken, the rewarded coins are added to the staked + # portfolio. TODO (for MULTI_DEPOT only): Directly add the + # rewarded coins to the staking depot (not like here with the + # detour of adding it to spot and then staking the same amount) + self.append_operation( + "Staking", utc_time, platform, change, coin, row, file_path + ) def _read_kraken_ledgers_old(self, file_path: Path) -> None: diff --git a/src/core.py b/src/core.py index 67326ec4..e1d11dd7 100644 --- a/src/core.py +++ b/src/core.py @@ -202,32 +202,41 @@ class Fiat(Enum): # Converts clean fiat / clean crypto pairs to "dirty" API asset pairs # (e.g. ETHEUR -> XETHZEUR) # Analyzed using asset pairs API data: -# https://api.kraken.com/0/public/AssetPairs (retrieved at 2021-02-21) +# https://api.kraken.com/0/public/AssetPairs (retrieved at 2022-01-02) kraken_asset_map = { # Fiat: - "ZUSD": "USD", - "ZEUR": "EUR", - "ZCAD": "CAD", - "ZJPY": "JPY", - "ZGBP": "GBP", "CHF": "CHF", "ZAUD": "AUD", + "ZCAD": "CAD", + "ZEUR": "EUR", + "ZGBP": "GBP", + "ZJPY": "JPY", + "ZUSD": "USD", # Crypto: - "XXBT": "XBT", + "XETC": "ETC", "XETH": "ETH", "XLTC": "LTC", + "XMLN": "MLN", + "XREP": "REP", + "XXBT": "XBT", "XXDG": "XDG", + "XXLM": "XLM", + "XXMR": "XMR", "XXRP": "XRP", + "XZEC": "ZEC", } # Only these asset pairs violate the rule # "clean name" + "clean name" = "asset pair" kraken_pair_map = { "USDTUSD": "USDTZUSD", - "XETCETH": "XETCXETH", - "XETCXBT": "XETCXXBT", - "XETCEUR": "XETCZEUR", - "XETCUSD": "XETCZUSD", + "ETCETH": "XETCXETH", + "ETCXBT": "XETCXXBT", + "ETCEUR": "XETCZEUR", + "ETCUSD": "XETCZUSD", + "ETH2ETH": "ETH2.SETH", + "ETH2EUR": "XETHZEUR", + "ETH2USD": "XETHZUSD", "ETHXBT": "XETHXXBT", "ETHCAD": "XETHZCAD", "ETHEUR": "XETHZEUR", @@ -238,36 +247,36 @@ class Fiat(Enum): "LTCEUR": "XLTCZEUR", "LTCJPY": "XLTCZJPY", "LTCUSD": "XLTCZUSD", - "XMLNETH": "XMLNXETH", - "XMLNXBT": "XMLNXXBT", - "XMLNEUR": "XMLNZEUR", - "XMLNUSD": "XMLNZUSD", - "XREPETH": "XREPXETH", - "XREPXBT": "XREPXXBT", - "XREPEUR": "XREPZEUR", - "XREPUSD": "XREPZUSD", + "MLNETH": "XMLNXETH", + "MLNXBT": "XMLNXXBT", + "MLNEUR": "XMLNZEUR", + "MLNUSD": "XMLNZUSD", + "REPETH": "XREPXETH", + "REPXBT": "XREPXXBT", + "REPEUR": "XREPZEUR", + "REPUSD": "XREPZUSD", "XBTCAD": "XXBTZCAD", "XBTEUR": "XXBTZEUR", "XBTGBP": "XXBTZGBP", "XBTJPY": "XXBTZJPY", "XBTUSD": "XXBTZUSD", "XDGXBT": "XXDGXXBT", - "XXLMXBT": "XXLMXXBT", - "XXLMAUD": "XXLMZAUD", - "XXLMEUR": "XXLMZEUR", - "XXLMGBP": "XXLMZGBP", - "XXLMUSD": "XXLMZUSD", - "XXMRXBT": "XXMRXXBT", - "XXMREUR": "XXMRZEUR", - "XXMRUSD": "XXMRZUSD", + "XLMXBT": "XXLMXXBT", + "XLMAUD": "XXLMZAUD", + "XLMEUR": "XXLMZEUR", + "XLMGBP": "XXLMZGBP", + "XLMUSD": "XXLMZUSD", + "XMRXBT": "XXMRXXBT", + "XMREUR": "XXMRZEUR", + "XMRUSD": "XXMRZUSD", "XRPXBT": "XXRPXXBT", "XRPCAD": "XXRPZCAD", "XRPEUR": "XXRPZEUR", "XRPJPY": "XXRPZJPY", "XRPUSD": "XXRPZUSD", - "XZECXBT": "XZECXXBT", - "XZECEUR": "XZECZEUR", - "XZECUSD": "XZECZUSD", + "ZECXBT": "XZECXXBT", + "ZECEUR": "XZECZEUR", + "ZECUSD": "XZECZUSD", "EURUSD": "ZEURZUSD", "GBPUSD": "ZGBPZUSD", "USDCAD": "ZUSDZCAD", diff --git a/src/price_data.py b/src/price_data.py index 9dcabef5..b5b330a4 100644 --- a/src/price_data.py +++ b/src/price_data.py @@ -41,6 +41,9 @@ class PriceData: + # list of Kraken pairs that returned invalid arguments error + kraken_invalid_pairs: list[str] = [] + def get_db_path(self, platform: str) -> Path: return Path(config.DATA_PATH, f"{platform}.db") @@ -315,10 +318,12 @@ def _get_price_kraken( For this we fetch one chunk of the trade history, starting `minutes_step`minutes before this timestamp. We then walk through the history until the closest timestamp match is - found. Otherwise, we start another 10 minutes earlier and try again. + found. Otherwise (if all received price data points are newer than the desired + timestamp), we start another 10 minutes earlier and try again. (Exiting with a warning and zero price after hitting the arbitrarily chosen offset limit of 120 minutes). If the initial offset is already - too large, recursively retry by reducing the offset step, + too large (i.e. all received price data points are older than the desired + timestamp), recursively retry by reducing the offset step, down to 1 minute. Documentation: https://www.kraken.com/features/api @@ -336,8 +341,7 @@ def _get_price_kraken( """ target_timestamp = misc.to_ms_timestamp(utc_time) root_url = "https://api.kraken.com/0/public/Trades" - pair = base_asset + quote_asset - pair = kraken_pair_map.get(pair, pair) + inverse = False minutes_offset = 0 while minutes_offset < 120: @@ -346,10 +350,28 @@ def _get_price_kraken( since = misc.to_ns_timestamp( utc_time - datetime.timedelta(minutes=minutes_offset) ) - url = f"{root_url}?{pair=:}&{since=:}" num_retries = 10 while num_retries: + pair = base_asset + quote_asset + pair = kraken_pair_map.get(pair, pair) + + # if the pair is invalid, invert it + if pair in self.kraken_invalid_pairs: + inverse = not inverse + base_asset, quote_asset = quote_asset, base_asset + pair = base_asset + quote_asset + pair = kraken_pair_map.get(pair, pair) + # if inverted pair is also invalid, throw error + if pair in self.kraken_invalid_pairs: + log.error( + f"Could not retrieve trades for {pair} or inverse pair, " + "invalid arguments error. Please create an Issue or PR." + ) + raise RuntimeError + + url = f"{root_url}?{pair=:}&{since=:}" + log.debug( f"Querying trades for {pair} at {utc_time} " f"(offset={minutes_offset}m): Calling %s", @@ -361,22 +383,29 @@ def _get_price_kraken( if not data["error"]: break + elif data["error"] == ['EGeneral:Invalid arguments']: + # add pair to invalid pairs list + # leads to inversion of pair next time + log.warning( + f"Invalid arguments error for {pair} at {utc_time} " + f"(offset={minutes_offset}m): " + f"Blocking pair and trying inverse coin pair ..." + ) + self.kraken_invalid_pairs.append(pair) else: num_retries -= 1 sleep_duration = 2 ** (10 - num_retries) log.warning( - f"Querying trades for {pair} at {utc_time} " - f"(offset={minutes_offset}m): " - f"Could not retrieve trades: {data['error']}. " + f"Could not retrieve trades for {pair} at {utc_time} " + f"(offset={minutes_offset}m): {data['error']}. " f"Retry in {sleep_duration} s ..." ) time.sleep(sleep_duration) continue else: log.error( - f"Querying trades for {pair} at {utc_time} " - f"(offset={minutes_offset}m): " - f"Could not retrieve trades: {data['error']}" + f"Could not retrieve trades for {pair} at {utc_time} " + f"(offset={minutes_offset}m): {data['error']}. " ) raise RuntimeError("Kraken response keeps having error flags.") @@ -388,14 +417,34 @@ def _get_price_kraken( ) # The desired timestamp is in the past; increase the offset + # desired timestamp is smaller than all timestamps of the received data if closest_match_index == -1: continue # The desired timestamp is in the future + # desired timestamp is larger than all timestamps of the received data if closest_match_index == len(data_timestamps_ms) - 1: - - if minutes_step == 1: - # Cannot remove interval any further; give up + if len(data_timestamps_ms) < 100: + # The API returns the last 1000 trades. If less than 100 trades are + # received, it can be assumed that we've received the last trade. + price_timestamp = data_timestamps_ms[closest_match_index] + log.debug( + "Accepting price from " + f"{datetime.datetime.fromtimestamp(price_timestamp/1000.0)} " + f"as latest price for {pair} at {utc_time}" + ) + # This should normally only happen for virtual sells, therefore + # raise a warning if the target timestamp is older than one hour + now_timestamp = misc.to_ms_timestamp( + datetime.datetime.now().astimezone() + ) + if target_timestamp < now_timestamp - 3600 * 1000: + log.warning( + f"Timestamp for {pair} at {utc_time} is older than one " + "hour, still accepted latest received trading price" + ) + elif minutes_step == 1: + # Cannot reduce interval any further; give up break else: # We missed the desired timestamp because our initial step @@ -408,11 +457,12 @@ def _get_price_kraken( ) price = misc.force_decimal(data[closest_match_index][0]) + if inverse: + price = misc.reciprocal(price) return price log.warning( - f"Querying trades for {pair} at {utc_time}: " - f"Failed to find matching exchange rate. " + f"Failed to find matching exchange rate for {pair} at {utc_time}: " "Please create an Issue or PR." ) return decimal.Decimal()