Skip to content

Commit

Permalink
Fix cluster startup errors (#645)
Browse files Browse the repository at this point in the history
* Always get veth-index before get mac

* InitializeInterfaces: wait for veth index non-negative

* Fix runtime size change error

* Change retry timeout to 60 minutes

* Additional logging for endpoint creation

* Fix pod label delete key error

* EP create wait for bouncer

* log veth to ifindex information

* Log transitd output

* Service wait for net provisioned
  • Loading branch information
phudtran authored Mar 9, 2022
1 parent 9772e47 commit 2671141
Show file tree
Hide file tree
Showing 20 changed files with 103 additions and 60 deletions.
4 changes: 2 additions & 2 deletions mizar/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ class OBJ_DEFAULTS:
mizar_ep_vpc_annotation = "mizar.com/vpc"
mizar_ep_subnet_annotation = "mizar.com/subnet"

# 15 minutes of retries
kopf_max_retries = 60
# 60 minutes of retries
kopf_max_retries = 240
kopf_retry_delay = 15


Expand Down
7 changes: 4 additions & 3 deletions mizar/common/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def update_agent_substrate_ep(self, ep, ip, mac, task):
logger.info("update_agent_substrate_ep: {}".format(cmd))
returncode, text = run_cmd(cmd)
logger.info(
"update_agent_substrate_ep {} returns {} {}".format(cmd, text, returncode))
"update_agent_substrate_ep {} {} returns {} {}".format(ep.name, cmd, text, returncode))
if (CONSTANTS.RPC_ERROR_CODE in text or
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
Expand Down Expand Up @@ -173,7 +173,8 @@ def update_ep(self, ep, task):
cmd = f'''{self.trn_cli_update_ep} \'{jsonconf}\''''
logger.info("update_ep: {}".format(cmd))
returncode, text = run_cmd(cmd)
logger.info("update_ep {} returns {} {}".format(cmd, text, returncode))
logger.info("update_ep {} {} returns {} {}".format(
ep.name, cmd, text, returncode))
if (CONSTANTS.RPC_ERROR_CODE in text or
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
Expand Down Expand Up @@ -235,7 +236,7 @@ def update_agent_metadata(self, ep, task):
logger.info("update_agent_metadata: {}".format(cmd))
returncode, text = run_cmd(cmd)
logger.info(
"update_agent_metadata {} returns {} {}".format(cmd, text, returncode))
"update_agent_metadata ep {} {} returns {} {}".format(ep.name, cmd, text, returncode))
if (CONSTANTS.RPC_ERROR_CODE in text or
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
Expand Down
2 changes: 1 addition & 1 deletion mizar/daemon/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def init(benchmark=False):
output = r.stdout.read().decode().strip()
logging.info("Removed existing XDP program: {}".format(output))

cmd = "nsenter -t 1 -m -u -n -i /trn_bin/transitd &"
cmd = "nsenter -t 1 -m -u -n -i /trn_bin/transitd >transitd.log &"
r = subprocess.Popen(cmd, shell=True)
logging.info("Running transitd")
time.sleep(1)
Expand Down
17 changes: 11 additions & 6 deletions mizar/daemon/interface_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,30 @@ def _CreateVethInterface(self, interface):
try:
logger.info("Creating interface {}".format(veth_name))
self.iproute.link('add', ifname=veth_name,
peer=veth_peer, kind='veth')
peer=veth_peer, kind='veth')
except Exception as e:
if e.code == CONSTANTS.NETLINK_FILE_EXISTS_ERROR:
logger.info("Veth already exists! Continuing")
veth_index = get_iface_index(veth_name, self.iproute)
logger.info(
"Veth already exists! veth index is {} Continuing".format(veth_index))
pass

else:
logger.info(
"Unknown exception occured when creating veth {}".format(e))
veth_index = get_iface_index(veth_name, self.iproute)
else:
logger.info("Interface {} already exists!".format(veth_name))

veth_index = get_iface_index(veth_name, self.iproute)
mac_address = ""
if veth_index != -1:
mac_address = get_iface_mac(veth_index, self.iproute)
# Update the mac address with the interface address
address = InterfaceAddress(
version=interface.address.version,
ip_address=interface.address.ip_address,
ip_prefix=interface.address.ip_prefix,
gateway_ip=interface.address.gateway_ip,
mac=get_iface_mac(veth_index, self.iproute),
mac=mac_address,
tunnel_id=interface.address.tunnel_id
)
interface.address.CopyFrom(address)
Expand Down Expand Up @@ -189,7 +193,8 @@ def _ConfigureTransitAgent(self, interface):
update the agent metadata and endpoint.
"""
pod_name = get_pod_name(interface.interface_id.pod_id)
logger.info("Loading transit agent and configuring for pod {}".format(pod_name))
logger.info(
"Loading transit agent and configuring for pod {}".format(pod_name))
self.rpc.load_transit_agent_xdp(interface)

for bouncer in interface.bouncers:
Expand Down
15 changes: 11 additions & 4 deletions mizar/dp/mizar/operators/bouncers/bouncers_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,38 @@ def store_update(self, b):

def set_bouncer_provisioned(self, bouncer):
bouncer.set_status(OBJ_STATUS.bouncer_status_provisioned)
self.store_update(bouncer)
bouncer.update_obj()

def update_bouncers_with_divider(self, div, task):
bouncers = self.store.get_bouncers_of_vpc(div.vpc)
for b in bouncers.values():
for b in list(bouncers.values()):
b.update_vpc(set([div]), task)

def delete_divider_from_bouncers(self, div, task):
bouncers = self.store.get_bouncers_of_vpc(div.vpc)
for b in bouncers.values():
for b in list(bouncers.values()):
b.update_vpc(set([div]), task, False)

def update_endpoint_with_bouncers(self, ep, task):
self.update_endpoint_obj_with_bouncers(ep)
bouncers = self.store.get_bouncers_of_net(ep.net)
if not bouncers:
task.raise_temporary_error(
"Provisiond EP {}: Bouncers not yet ready!".format(ep.name))
eps = set([ep])
for key in bouncers:
for key in list(bouncers):
bouncers[key].update_eps(eps, task)

def update_endpoint_obj_with_bouncers(self, ep):
bouncers = self.store.get_bouncers_of_net(ep.net)
if ep.type == OBJ_DEFAULTS.ep_type_simple or ep.type == OBJ_DEFAULTS.ep_type_host:
ep.update_bouncers_list(bouncers)

def delete_endpoint_from_bouncers(self, ep):
bouncers = self.store.get_bouncers_of_net(ep.net)
eps = set([ep])
for key in bouncers:
for key in list(bouncers):
bouncers[key].delete_eps(eps)
self.store.update_bouncers_of_net(ep.net, bouncers)
if ep.type == OBJ_DEFAULTS.ep_type_simple:
Expand Down
8 changes: 4 additions & 4 deletions mizar/dp/mizar/operators/dividers/dividers_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,23 @@ def set_divider_provisioned(self, div):

def update_divider_with_bouncers(self, bouncer, net, task):
dividers = self.store.get_dividers_of_vpc(bouncer.vpc).values()
for d in dividers:
for d in list(dividers):
d.update_net(net, task)

def delete_bouncer_from_dividers(self, bouncer, net, task):
dividers = self.store.get_dividers_of_vpc(bouncer.vpc).values()
for d in dividers:
for d in list(dividers):
d.update_net(net, task, False)

def update_net(self, net, task, dividers=None):
if not dividers:
dividers = self.store.get_dividers_of_vpc(net.vpc).values()
for d in dividers:
for d in list(dividers):
d.update_net(net, task)

def delete_net(self, net):
dividers = self.store.get_dividers_of_vpc(net.vpc).values()
for d in dividers:
for d in list(dividers):
d.delete_net(net)

def delete_nets_from_divider(self, nets, divider):
Expand Down
10 changes: 8 additions & 2 deletions mizar/dp/mizar/operators/endpoints/endpoints_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def delete_endpoints_from_bouncers(self, bouncer):

def delete_bouncer_from_endpoints(self, bouncer, task):
eps = self.store.get_eps_in_net(bouncer.net).values()
for ep in eps:
for ep in list(eps):
if ep.type == OBJ_DEFAULTS.ep_type_simple or ep.type == OBJ_DEFAULTS.ep_type_host:
ep.update_bouncers({bouncer.name: bouncer}, task, False)

Expand Down Expand Up @@ -454,7 +454,13 @@ def init_simple_endpoint_interfaces(self, worker_ip, spec, task):
# allocate the mac addresses for us.
logger.info("init_simple_endpoint_interface on {} for {}".format(
worker_ip, spec['name']))
return InterfaceServiceClient(worker_ip).InitializeInterfaces(interfaces, task)
interfaces = InterfaceServiceClient(
worker_ip).InitializeInterfaces(interfaces, task)
for interface in interfaces.interfaces:
if not interface.address.mac:
task.raise_temporary_error(
"Veth did not come up in time for pod {} interface {}".format(pod_id, interface))
return interfaces
return None

def init_host_endpoint_interfaces(self, droplet, ifname, veth_name, peer_name, task):
Expand Down
2 changes: 0 additions & 2 deletions mizar/dp/mizar/workflows/bouncers/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def run(self):

net.bouncers[bouncer.name] = bouncer
dividers_opr.update_divider_with_bouncers(bouncer, net, self)
endpoints_opr.update_bouncer_with_endpoints(bouncer, self)
endpoints_opr.update_endpoints_with_bouncers(bouncer, self)
bouncer.load_transit_xdp_pipeline_stage(self)
bouncers_opr.set_bouncer_provisioned(bouncer)
self.finalize()
4 changes: 4 additions & 0 deletions mizar/dp/mizar/workflows/bouncers/provisioned.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
from mizar.dp.mizar.operators.bouncers.bouncers_operator import *
from mizar.dp.mizar.operators.droplets.droplets_operator import *
from mizar.dp.mizar.operators.vpcs.vpcs_operator import *
from mizar.dp.mizar.operators.endpoints.endpoints_operator import *

logger = logging.getLogger()

bouncers_opr = BouncerOperator()
vpcs_opr = VpcOperator()
droplets_opr = DropletOperator()
endpoints_opr = EndpointOperator()


class BouncerProvisioned(WorkflowTask):
Expand All @@ -44,5 +46,7 @@ def run(self):
self.param.name, self.param.spec)
bouncer.set_vni(vpcs_opr.store.get_vpc(bouncer.vpc).vni)
bouncer.droplet_obj = droplets_opr.store.get_droplet(bouncer.droplet)
endpoints_opr.update_bouncer_with_endpoints(bouncer, self)
endpoints_opr.update_endpoints_with_bouncers(bouncer, self)
bouncers_opr.store_update(bouncer)
self.finalize()
4 changes: 4 additions & 0 deletions mizar/dp/mizar/workflows/builtins/pods/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def run(self):
"VPC {} has no subnets to allocate pod {}!".format(vpc_name, self.param.name))
spec['vpc'] = vpc_name
spec['subnet'] = subnet_name
else:
if not COMPUTE_PROVIDER.k8s:
self.raise_temporary_error(
"VPC for pod {} not yet created!".format(self.param.name))

spec['vni'] = vpc_opr.store_get(spec['vpc']).vni
spec['droplet'] = droplet_opr.store_get_by_main_ip(spec['hostIP'])
Expand Down
2 changes: 2 additions & 0 deletions mizar/dp/mizar/workflows/builtins/services/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def run(self):
if not net:
self.raise_temporary_error(
"Task: {} Net not yet created.".format(self.__class__.__name__))
if not net.vni:
self.raise_temporary_error("k8sServiceCreate: Net {} does not yet have a vni!".format(net.name))
logger.info("Creating scaled endpoint in subnet: {}.".format(net.name))
ep = endpoints_opr.create_scaled_endpoint(
self.param.name, name, self.param.spec, net, self.param.extra, self.param.body['metadata']['namespace'])
Expand Down
2 changes: 1 addition & 1 deletion mizar/dp/mizar/workflows/droplets/provisioned.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run(self):
logger.info("Run {task}".format(task=self.__class__.__name__))
droplet = droplets_opr.get_droplet_stored_obj(
self.param.name, self.param.spec)
for vpc in vpcs_opr.store.get_all_vpcs():
for vpc in list(vpcs_opr.store.get_all_vpcs()):
if droplet.name not in droplets_opr.store.vpc_droplet_store[vpc.name]:
if vpc.name not in nets_opr.store.nets_vpc_store:
self.raise_temporary_error(
Expand Down
12 changes: 11 additions & 1 deletion mizar/dp/mizar/workflows/endpoints/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,18 @@ def run(self):
self.raise_temporary_error(
"Task: {} Endpoint: {} Droplet Object {} not ready. ".format(self.__class__.__name__, ep.name, ep.droplet))
vpc = vpcs_opr.store.get_vpc(ep.vpc)
# EP create wait for bouncer
net_bouncer = list(
bouncers_opr.store.get_bouncers_of_net(ep.net).values())
if not net_bouncer:
self.raise_temporary_error(
"EP create {}: bouncers not yet ready for net {}".format(ep.name, ep.net))
if net_bouncer[0].status != OBJ_STATUS.bouncer_status_provisioned:
self.raise_temporary_error(
"EP create {}: bouncers not yet ready for net {}".format(ep.name, ep.net))
nets_opr.allocate_endpoint(ep, vpc)
bouncers_opr.update_endpoint_with_bouncers(ep, self)
bouncers_opr.update_endpoint_obj_with_bouncers(ep)

if ep.type == OBJ_DEFAULTS.ep_type_simple or ep.type == OBJ_DEFAULTS.ep_type_host:
if ep.type == OBJ_DEFAULTS.ep_type_host:
logger.info("Activate host interface for vpc {} on droplet {}".format(
Expand Down
2 changes: 1 addition & 1 deletion mizar/dp/mizar/workflows/nets/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def run(self):
if len(dividers_opr.store.get_dividers_of_vpc(n.vpc)) < 1:
self.raise_temporary_error(
"Task: {} Net: {} Dividers not available".format(self.__class__.__name__, n.name))
logger.info("NetCreate Net ip is {}".format(n.ip))
logger.info("NetCreate Net ip is {}, VNI is {}".format(n.ip, n.vni))
nets_opr.create_net_bouncers(n, n.n_bouncers)
nets_opr.store_update(n)
ep = endpoints_opr.create_gw_endpoint(
Expand Down
2 changes: 1 addition & 1 deletion mizar/dp/mizar/workflows/nets/provisioned.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def run(self):
if d[0] == 'change':
self.process_change(net=net, field=d[1], old=d[2], new=d[3])
vpc = vpcs_opr.store.get_vpc(net.vpc)
for droplet in droplets_opr.store.get_all_droplets():
for droplet in list(droplets_opr.store.get_all_droplets()):
logger.info("Net: Available droplets for vpc {}: {}".format(
vpc.name, droplets_opr.store.droplets_store.keys()))
if vpc.name not in droplets_opr.store_get_vpc_to_droplet(droplet):
Expand Down
2 changes: 2 additions & 0 deletions mizar/obj/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ def set_vpc_prefix(self, vpc_prefix):
def update_bouncers(self, bouncers, task, add=True):
for bouncer in bouncers.values():
if add:
logger.info("endpoint obj: update_bouncer: ep {} update agent with bouncer {}".format(
self.name, bouncer.name))
self.bouncers[bouncer.name] = bouncer
self.update_agent_substrate(self, bouncer, task)
else:
Expand Down
9 changes: 5 additions & 4 deletions mizar/store/operator_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,12 @@ def update_pod_namespace_store(self, pod_name, namespace_name):
self.update_namespace_pod_store(namespace_name, pod_name)

def delete_pod_namespace_store(self, pod_name):
ns = self.pod_namespace_store[pod_name]
if ns in self.namespace_pod_store and pod_name in self.namespace_pod_store[ns]:
self.namespace_pod_store[ns].remove(pod_name)
if pod_name in self.pod_namespace_store:
del self.pod_namespace_store[pod_name]
ns = self.pod_namespace_store[pod_name]
if ns in self.namespace_pod_store and pod_name in self.namespace_pod_store[ns]:
self.namespace_pod_store[ns].remove(pod_name)
if pod_name in self.pod_namespace_store:
del self.pod_namespace_store[pod_name]

def update_droplet(self, droplet):
self.droplets_store[droplet.name] = droplet
Expand Down
3 changes: 2 additions & 1 deletion src/dmn/trn_agent_xdp_usr.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ static int _trn_bpf_agent_prog_load_xattr(struct agent_user_metadata_t *md,
_REUSE_MAP_IF_PINNED(conn_track_cache);
_REUSE_MAP_IF_PINNED(ing_pod_label_policy_map);
_REUSE_MAP_IF_PINNED(ing_namespace_label_policy_map);
_REUSE_MAP_IF_PINNED(ing_pod_and_namespace_label_policy_map);
_REUSE_MAP_IF_PINNED(ing_pod_and_namespace_label_policy_map);
_REUSE_MAP_IF_PINNED(tx_stats_map);

/* Only one prog is supported */
Expand Down Expand Up @@ -625,6 +625,7 @@ int trn_agent_metadata_init(struct agent_user_metadata_t *md, char *itf,
TRN_LOG_ERROR("Error retrieving index of interface");
return 1;
}
TRN_LOG_DEBUG("trn_agent_metadata_init: mapped hosted_interface:%s to index %d", itf, md->ifindex);

if (_trn_bpf_agent_prog_load_xattr(md, &prog_load_attr, &md->obj,
&md->prog_fd)) {
Expand Down
Loading

0 comments on commit 2671141

Please sign in to comment.