4
4
from typing import Dict , List , NamedTuple , Optional , Tuple
5
5
from urllib .request import parse_http_list
6
6
from urllib .parse import urljoin
7
+ import json
7
8
8
9
from nvchecker .api import session , HTTPError
9
10
@@ -57,15 +58,7 @@ async def get_registry_auth_info(registry_host: str) -> AuthInfo:
57
58
58
59
async def get_container_tags (info : Tuple [str , str , AuthInfo ]) -> List [str ]:
59
60
image_path , registry_host , auth_info = info
60
-
61
- auth_params = {
62
- 'scope' : f'repository:{ image_path } :pull' ,
63
- }
64
- if auth_info .service :
65
- auth_params ['service' ] = auth_info .service
66
- res = await session .get (auth_info .realm , params = auth_params )
67
- token = res .json ()['token' ]
68
-
61
+ token = await get_auth_token (auth_info , image_path )
69
62
tags = []
70
63
url = f'https://{ registry_host } /v2/{ image_path } /tags/list'
71
64
@@ -83,20 +76,73 @@ async def get_container_tags(info: Tuple[str, str, AuthInfo]) -> List[str]:
83
76
84
77
return tags
85
78
79
+
80
+ async def get_auth_token (auth_info , image_path ):
81
+ auth_params = {
82
+ 'scope' : f'repository:{ image_path } :pull' ,
83
+ }
84
+ if auth_info .service :
85
+ auth_params ['service' ] = auth_info .service
86
+ res = await session .get (auth_info .realm , params = auth_params )
87
+ token = res .json ()['token' ]
88
+ return token
89
+
90
+
86
91
def parse_next_link (value : str ) -> str :
87
92
ending = '>; rel="next"'
88
93
if value .endswith (ending ):
89
94
return value [1 :- len (ending )]
90
95
else :
91
96
raise ValueError (value )
92
97
98
+
99
+ async def get_container_tag_update_time (info : Tuple [str , str , str , AuthInfo ]):
100
+ '''
101
+ Find the update time of a container tag.
102
+
103
+ In fact, it's the creation time of the image ID referred by the tag. Tag itself does not have any update time.
104
+ '''
105
+ image_path , image_tag , registry_host , auth_info = info
106
+ token = await get_auth_token (auth_info , image_path )
107
+
108
+ # HTTP headers
109
+ headers = {
110
+ 'Authorization' : f'Bearer { token } ' ,
111
+ # Prefer Image Manifest Version 2, Schema 2: https://distribution.github.io/distribution/spec/manifest-v2-2/
112
+ 'Accept' : 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.docker.container.image.v1+json, application/json' ,
113
+ }
114
+
115
+ # Get tag manifest
116
+ url = f'https://{ registry_host } /v2/{ image_path } /manifests/{ image_tag } '
117
+ res = await session .get (url , headers = headers )
118
+ data = res .json ()
119
+ # Schema 1 returns the creation time in the response
120
+ if data ['schemaVersion' ] == 1 :
121
+ return json .loads (data ['history' ][0 ]['v1Compatibility' ])['created' ]
122
+
123
+ # For schema 2, we have to fetch the config's blob
124
+ digest = data ['config' ]['digest' ]
125
+ url = f'https://{ registry_host } /v2/{ image_path } /blobs/{ digest } '
126
+ res = await session .get (url , headers = headers )
127
+ data = res .json ()
128
+ return data ['created' ]
129
+
130
+
93
131
async def get_version (name , conf , * , cache , ** kwargs ):
94
132
image_path = conf .get ('container' , name )
133
+ image_tag = None
134
+ # image tag is optional
135
+ if ':' in image_path :
136
+ image_path , image_tag = image_path .split (':' , 1 )
95
137
registry_host = conf .get ('registry' , 'docker.io' )
96
138
if registry_host == 'docker.io' :
97
139
registry_host = 'registry-1.docker.io'
98
140
99
141
auth_info = await cache .get (registry_host , get_registry_auth_info )
100
142
143
+ # if a tag is given, return the tag's update time, otherwise return the image's tag list
144
+ if image_tag :
145
+ key = image_path , image_tag , registry_host , auth_info
146
+ return await cache .get (key , get_container_tag_update_time )
101
147
key = image_path , registry_host , auth_info
102
148
return await cache .get (key , get_container_tags )
0 commit comments