Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions homework/lsl/acceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-

import cluster
import message

import Queue
import logging


logger = logging.getLogger(__name__)


class Acceptor(cluster.Node):
def __init__(self, ident):
super(Acceptor, self).__init__(ident, cluster.Role.ACCEPTOR)
self.register()

''' pre_round_num is None means never receives msg '''
self.pre_round_num = None

''' round_num and value accepted '''
self.round_num = None
self.value = None

def stable_store(self, round_num, value=None, accepted=False):
''' simulate store info on stable storage '''
self.pre_round_num = round_num

if accepted:
self.round_num = round_num
self.value = value

def handle_msg(self):
send_msg = None
recv_msg = self.receive()

logger.info("{who} receives msg: {msg}".format(who=self.ident, msg=recv_msg))

if recv_msg.category == message.Category.PREPATR_REQUEST:
round_num = recv_msg.round_num

if self.pre_round_num is None or round_num > self.pre_round_num:
send_msg = message.Message(self.ident,
recv_msg.sender,
self.round_num,
self.value,
message.Category.PREPATR_REQUEST,
message.MessageType.RESPONSE)

self.stable_store(round_num)
logger.info("{who} accept prepare request: {m}".format(who=self.ident, m=recv_msg))
else:
logger.info("{who} reject prepare request: {m}".format(who=self.ident, m=recv_msg))

elif recv_msg.category == message.Category.ACCEPT_REQUEST:
round_num = recv_msg.round_num
if self.pre_round_num is None or round_num >= self.pre_round_num:
send_msg = message.Message(self.ident,
recv_msg.sender,
round_num,
recv_msg.value,
message.Category.ACCEPT_REQUEST,
message.MessageType.RESPONSE)
self.stable_store(round_num, recv_msg.value, True)
logger.info("{who} accept accept request: {m}".format(who=self.ident, m=recv_msg))
else:
logger.info("{who} reject accept request: {m}".format(who=self.ident, m=recv_msg))

else:
''' ignore unkonwn msg '''
logger.warn("received unknoen message: {msg}".format(msg=recv_msg))
pass

if send_msg is not None:
self.send(send_msg)

def run(self):
while True:
self.handle_msg()
72 changes: 72 additions & 0 deletions homework/lsl/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
'''
nodes info in cluster
'''

import copy
import Queue
import logging
import random
import time


class Role(object):
PROPOSER = "proposer"
ACCEPTOR = "acceptor"


random.seed(time.time())
logger = logging.getLogger(__name__)
CLUSTER_NODES_BY_IDENT = {}
CLUSTER_NODES_BY_ROLE = {
Role.PROPOSER: {},
Role.ACCEPTOR: {},
}


class Node(object):
def __init__(self, ident, role):
self.role = role
self.ident = ident
self.channel = {
# TODO(lsl): one queue may enough
"send": Queue.Queue(),
"recv": Queue.Queue(),
}

def receive(self, timeout=None):
msg = None
try:
msg = self.channel["recv"].get(timeout=timeout)
return msg
except Queue.Empty as err:
logger.info("timeout before receive any messages, {e}".format(e=err))
return None


def send(self, msg):
rece_chan = self.getNodeChannel(msg.receiver)
rece_chan["recv"].put(msg)

def register(self):
CLUSTER_NODES_BY_IDENT[self.ident] = self.channel
CLUSTER_NODES_BY_ROLE[self.role][self.ident] = self.channel

def getNodeChannel(self, ident):
return CLUSTER_NODES_BY_IDENT[ident]


def getRandomNodesByRoleList(role):
idents = CLUSTER_NODES_BY_ROLE[role].keys()
random.seed(time.time())
random.shuffle(idents)

ret = []
for ident in idents:
ret.append({
"ident": ident,
"channel": CLUSTER_NODES_BY_IDENT[ident],
})

return ret

33 changes: 33 additions & 0 deletions homework/lsl/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-


class Category(object):
ACCEPT_REQUEST = "accept"
PREPATR_REQUEST = "prepare"


class MessageType(object):
REQUEST = "request"
RESPONSE = "response"


class Message(object):
def __init__(self, sender, receiver, round_num, value, category, m_type):
self.sender = sender
self.receiver = receiver

''' round_num is None and value is None means never receives msg '''
self.round_num = round_num
self.value = value

''' prepare request or accept request '''
self.category = category
''' request or response which is not a must '''
self.m_type = m_type

def __repr__(self):
return "<sender: {s}, recevier: {r}, "\
"round:{n}, value: {v}, {c}-{m}>"\
"".format(s=self.sender, r=self.receiver,
n=self.round_num, v=self.value,
c=self.category, m=self.m_type)
78 changes: 78 additions & 0 deletions homework/lsl/paxos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
"""
main entry of paxos demo
"""

import cluster
import proposer
import acceptor

import time
import threading
import logging


logger = logging.getLogger(__name__)


def make_nodes(num, role):
ret= []
for idx in range(num):
if role == cluster.Role.PROPOSER:
node = proposer.Proposer("proposer-{idx}".format(idx=idx))

elif role == cluster.Role.ACCEPTOR:
node = acceptor.Acceptor("acceptor-{idx}".format(idx=idx))

else:
raise Exception("unknown role: {r}".format(r=role))

ret.append(node)

return ret

def main():
acceptors = make_nodes(5, cluster.Role.ACCEPTOR)
proposers = make_nodes(3, cluster.Role.PROPOSER)

logger.info("cluster: {nodes}".format(nodes=cluster.CLUSTER_NODES_BY_IDENT))

''' start acceptors '''
for acptor in acceptors:
threading.Thread(target=acptor.run).start()

logger.info("acceptors started")
time.sleep(0.1)

proposer_ths = []
value = 0
''' start proposers '''
for pro in proposers:
th = threading.Thread(target=pro.propose, args=(value,))
th.start()
proposer_ths.append(th)
value += 1

logger.info("proposers started")

for th in proposer_ths:
th.join()

logger.info("propose finished")
for acptor in acceptors:
logger.info("{who}: rnd: {rnd}, value: {val}".format(who=acptor.ident,
rnd=acptor.round_num,
val=acptor.value))


def init_log():
log_format = '%(asctime)s:%(levelname)s:%(filename)s:' \
'%(funcName)s:%(lineno)s:%(message)s'
datefmt = '%a, %d %b %Y %H:%M:%S'
level = logging.DEBUG

logging.basicConfig(format=log_format, datefmt=datefmt, level=level)

if __name__ == "__main__":
init_log()
main()
Loading