-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathconfig.py
185 lines (140 loc) · 6.54 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import glob
import pathlib
from typing import List, Union, Optional, Literal, Iterable
from pydantic import validator, conint, NonNegativeInt, constr, PositiveInt, root_validator, ValidationError, Field
from pydantic_yaml import YamlModel, YamlStrEnum
import settings
class ScriptsConfigV1(YamlModel):
build_outside_vm: str = ""
build_inside_vm: str
start_once: str
class FileDeployConfigV1(YamlModel):
source: str = ""
sources: List[str] = []
destination: str = "/home/$SERVICE"
def prepare_for_upload(self, config: "DeployConfig", config_folder: pathlib.Path) -> Iterable["FileDeployConfigV1"]:
# Packer's file provisioner works with "sources" option very bad: i.e., doesn't support directories there,
# so we convert "sources" into multiple Files with "source"
if self.sources:
files = [
FileDeployConfigV1(source=source, destination=self.destination) for source in self._unfold_globs(self.sources, config_folder)
]
for file in files:
yield from file.prepare_for_upload(config, config_folder)
return
# Support for glob in "source": interpreter them as "sources"
if self.source.startswith("/"):
files = glob.glob(self.source)
else:
files = config_folder.glob(self.source)
if len(list(files)) > 1:
yield from FileDeployConfigV1(
sources=[self.source],
destination=self.destination,
).prepare_for_upload(config, config_folder)
return
destination = substitute_variables(self.destination, config)
if config_folder / self.source and not destination.endswith("/"):
destination += "/"
yield FileDeployConfigV1(
source=self.source,
destination=destination,
)
@staticmethod
def _unfold_globs(sources: List[str], folder: pathlib.Path) -> List[str]:
result = []
for source in sources:
trailing_slash = "/" if source.endswith("/") else ""
result += [p.relative_to(folder).as_posix() + trailing_slash for p in folder.glob(source)]
return result
class ListenerProtocol(YamlStrEnum):
TCP = "tcp"
HTTP = "http"
class ProxySource(YamlStrEnum):
TEAM = "team"
class ProxyLimit(YamlModel):
source: ProxySource
location: Optional[str] = None
limit: str
burst: NonNegativeInt = 0
simultaneous_connections: Optional[int] = None
proxy_websockets: bool = False
class UpstreamProtocol(YamlStrEnum):
TCP = "tcp"
HTTP = "http"
HTTPS = "https"
class UpstreamConfigV1(YamlModel):
host_index: PositiveInt
port: conint(gt=0, le=65535)
protocol: UpstreamProtocol = UpstreamProtocol.HTTP
client_certificate: Optional[str]
class ListenerConfigV1(YamlModel):
protocol: ListenerProtocol
port: Optional[conint(gt=0, le=65535)] = None
hostname: Optional[str] = None
certificate: Optional[str] = None
client_certificate: Optional[str] = None
tcp_simultaneous_connections: Optional[int] = None
default: bool = False
@validator("certificate", "client_certificate")
def _validate_certificate(cls, certificate: Optional[str]) -> Optional[str]:
if certificate is not None:
assert certificate in settings.PROXY_CERTIFICATES, f"unknown certificate name: {certificate}"
return certificate
@validator("port", always=True)
def _validate_port(cls, port: Optional[int], values) -> int:
if port is not None:
return port
if values["protocol"] == "http":
if "certificate" not in values or values["certificate"] is None:
return 80
if values["certificate"] is not None:
return 443
raise ValidationError(f"Port should be specified for proxy of type {values['protocol']}")
@validator("tcp_simultaneous_connections")
def _validate_tcp_simultaneous_connections(cls, tcp_simultaneous_connections: Optional[int], values) -> Optional[int]:
if values.get("protocol") == "http":
raise ValidationError("HTTP Proxy can not have listener.tcp_simultaneous_connections parameter")
return tcp_simultaneous_connections
class ProxyConfigV1(YamlModel):
disable: bool = Field(alias="__disable", default=False) # Hidden option for disabling (and removing) proxies
name: constr(min_length=1)
listener: ListenerConfigV1
upstream: UpstreamConfigV1
limits: List[ProxyLimit] = []
dns_records: List[str] = []
template: Optional[str] = None
@root_validator
def validate_object(cls, values):
if "listener" in values and values["listener"].protocol == "tcp":
for limit in values.get("limits", []):
if limit.location:
raise ValidationError("Limit in TCP proxy can not have a 'location' parameter")
if limit.proxy_websocket:
raise ValidationError("Limit in TCP proxy can not have a 'proxy_websocket' parameter")
if limit.simultaneous_connections:
raise ValidationError("Limit in TCP proxy can not have a 'simultaneous_connections' parameter. "
"Use listener.tcp_simultaneous_connections instead.")
if values["listener"].port is None:
raise ValidationError("TCP proxy must have listener.port parameter")
elif "listener" in values and values["listener"].protocol == "http":
for limit in values.get("limits", []):
if not limit.location:
raise ValidationError("Limit in HTTP proxy must have a location parameter")
return values
@validator("limits")
def validate_limits(cls, limits: List[ProxyLimit], values) -> List[ProxyLimit]:
if "listener" in values and values["listener"].protocol == "tcp":
if len(limits) > 1:
raise ValidationError(f"TCP proxy can have at most one limit, but you specified {len(limits)}")
return limits
class DeployConfigV1(YamlModel):
version: Literal[1]
service: constr(regex='^[a-z0-9_-]{1,30}$')
username: Optional[constr(regex='^[a-z0-9_-]{1,30}$')] = None
scripts: ScriptsConfigV1
files: List[FileDeployConfigV1]
proxies: List[ProxyConfigV1] = []
DeployConfig = Union[DeployConfigV1]
def substitute_variables(data: str, config: DeployConfig) -> str:
return data.replace("$SERVICE", config.service if config.service else "").replace("$USERNAME", config.username if config.username else "")