Skip to content

Commit

Permalink
feature(startup): load actual instance numbers from Occopus and Docker
Browse files Browse the repository at this point in the history
  • Loading branch information
smith4 committed Jul 6, 2018
1 parent 27e523e commit 9f32737
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 7 deletions.
15 changes: 14 additions & 1 deletion handle_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def query_list_of_nodes(endpoint,status='ready'):
log.exception('(Q) Query of docker nodes failed.')
return None


def scale_docker_service(endpoint,service_name,replicas):
log=logging.getLogger('pk_docker')
log.info('(S) => m_container_count: {0}'.format(replicas))
Expand All @@ -40,6 +39,20 @@ def scale_docker_service(endpoint,service_name,replicas):
log.warning('(S) Scaling of docker service "{0}" failed: {1}'.format(service_name,str(e)))
return

def query_docker_service_replicas(endpoint,service_name):
log=logging.getLogger('pk_docker')
instance = 1
if pk_config.simulate():
return
client = docker.APIClient(endpoint)
try:
response = client.inspect_service(service_name)
instance = response.get('Spec',dict()).get('Mode',dict()).get('Replicated',dict()).get('Replicas',1)
log.debug('(C) => m_container_count for {0}: {1}'.format(service_name,instance))
except Exception as e:
log.warning('(C) Querying docker service "{0}" replicas failed: {1}'.format(service_name,str(e)))
return instance

def query_service_network(endpoint, stack_name, service_name):
id = None
log=logging.getLogger('pk_docker')
Expand Down
12 changes: 11 additions & 1 deletion handle_occopus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import requests
import pk_config

def scale_occopus_worker_node(endpoint,infra_name,worker_name,replicas):
def scale_worker_node(endpoint,infra_name,worker_name,replicas):
log=logging.getLogger('pk_occopus')
log.info('(S) => m_node_count: {0}'.format(replicas))
wscall = '{0}/infrastructures/{1}/scaleto/{2}/{3}'.format(endpoint,infra_name,worker_name,replicas)
Expand All @@ -12,4 +12,14 @@ def scale_occopus_worker_node(endpoint,infra_name,worker_name,replicas):
log.debug('-->response: {0}'.format(response))
return

def query_number_of_worker_nodes(endpoint,infra_name,worker_name):
log=logging.getLogger('pk_occopus')
instances=1
wscall = '{0}/infrastructures/{1}'.format(endpoint,infra_name)
log.debug('-->curl -X GET {0}'.format(wscall))
if not pk_config.simulate():
response = requests.get(wscall).json()
instances = response.get(worker_name,dict()).get('scaling',dict()).get('target',0)
log.debug('-->instances: {0}, response: {1}'.format(instances,response))
return instances

42 changes: 37 additions & 5 deletions policy_keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@ def resolve_queries(policy_yaml):
template = jinja2.Template(policy_yaml)
return template.render(values)

def get_full_service_name(policy, service_name):
if policy.get('stack','') not in [None, '']:
full_service_name='{0}_{1}'.format(policy['stack'],service_name)
else:
full_service_name='{0}'.format(service_name)
return full_service_name

def perform_service_scaling(policy,service_name):
for srv in policy['scaling']['services']:
if 'm_container_count' in srv.get('outputs',dict()) and srv['name']==service_name:
log.debug('(S) Scaling values for service "{0}": min:{1} max:{2} calculated:{3}'
.format(srv['name'],srv['min_instances'],srv['max_instances'],srv['outputs']['m_container_count']))
containercount = max(min(int(srv['outputs']['m_container_count']),int(srv['max_instances'])),int(srv['min_instances']))
if policy.get('stack','') not in [None, '']:
service_name='{0}_{1}'.format(policy['stack'],srv['name'])
else:
service_name='{0}'.format(srv['name'])
service_name = get_full_service_name(policy, srv['name'])
config = pk_config.config()
dock.scale_docker_service(config['swarm_endpoint'],service_name,containercount)

Expand All @@ -48,7 +52,7 @@ def perform_worker_node_scaling(policy):
.format(node['min_instances'],node['max_instances'],node['outputs']['m_node_count']))
nodecount = max(min(int(node['outputs']['m_node_count']),int(node['max_instances'])),int(node['min_instances']))
config = pk_config.config()
occo.scale_occopus_worker_node(
occo.scale_worker_node(
endpoint=config['occopus_endpoint'],
infra_name=config['occopus_infra_name'],
worker_name=config['occopus_worker_name'],
Expand Down Expand Up @@ -102,6 +106,20 @@ def load_policy_from_file(policyfile):
policy = f.read()
return policy

def set_worker_node_instance_number(policy,instances):
policy.setdefault('scaling',dict())
policy['scaling'].setdefault('nodes',dict())
policy['scaling']['nodes'].setdefault('outputs',dict())
policy['scaling']['nodes']['outputs']['m_node_count']=instances
return

def set_docker_service_instance_number(policy,service_name,instances):
for theservice in policy.get('scaling',dict()).get('services',dict()):
if service_name == theservice.get('name',''):
theservice.setdefault('outputs',dict())
theservice['outputs']['m_container_count']=instances
return

def prepare_session(policy_yaml):
global log
log = logging.getLogger('pk')
Expand All @@ -123,6 +141,20 @@ def prepare_session(policy_yaml):
policy.get('stack','pk'))
log.info('(C) Notify prometheus to reload config starts')
prom.notify_to_reload_config(config['prometheus_endpoint'])
log.info('(C) Querying number of target nodes from Occopus starts')
instances = occo.query_number_of_worker_nodes(
endpoint=config['occopus_endpoint'],
infra_name=config['occopus_infra_name'],
worker_name=config['occopus_worker_name'])
log.info('(C) Setting m_node_count to {0}'.format(instances))
set_worker_node_instance_number(policy,instances)
log.info('(C) Querying number of service replicas from Swarm starts')
for theservice in policy.get('scaling',dict()).get('services',dict()):
service_name = theservice.get('name','')
full_service_name = get_full_service_name(policy, service_name)
instances = dock.query_docker_service_replicas(config['swarm_endpoint'],full_service_name)
log.info('(C) Setting m_container_count for {0} to {1}'.format(service_name, instances))
set_docker_service_instance_number(policy,service_name,instances)
return policy

def add_query_results_and_alerts_to_nodes(policy, results):
Expand Down

0 comments on commit 9f32737

Please sign in to comment.