Skip to content

Commit

Permalink
Skeleton for liveness analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcere committed Jul 11, 2024
1 parent 48a8855 commit 5d4096a
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 17 deletions.
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,11 @@ dependencies = [
"pytest",
]
name = "grey"
readme = "README.md"
readme = "README.md"

[tool.pytest.ini_options]
pythonpath = "src"
log_cli = true
log_cli_level = "DEBUG"
log_cli_format = "%(asctime)s [%(levelname)s] %(message)s (%(filename)s:%(lineno)s)"
log_cli_date_format = "%Y-%m-%d %H:%M:%S"
Empty file added src/analysis/__init__.py
Empty file.
51 changes: 51 additions & 0 deletions src/analysis/abstract_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Module that implements an AbstractState and an AbstractAnalysisInfo in order to perform the
fixpoint analysis.
"""

from typing import Any
from abc import ABC, abstractmethod


class AbstractBlockInfo(ABC):
"""
Class that contains the information from the program needed to perform the corresponding analysis.
Includes information about the CFG
"""

@property
@abstractmethod
def block_id(self) -> Any:
raise NotImplementedError

@property
@abstractmethod
def jumps_to(self) -> Any:
raise NotImplementedError

@property
@abstractmethod
def falls_to(self) -> Any:
raise NotImplementedError

@property
@abstractmethod
def block_type(self) -> Any:
raise NotImplementedError


class AbstractState(ABC):
"""
Class that contains the methods needed in the state of the corresponding analysis
"""

def __init__(self):
pass

@abstractmethod
def lub(self, state: 'AbstractState') -> None:
raise NotImplementedError

@abstractmethod
def leq(self, state: 'AbstractState') -> bool:
raise NotImplementedError
153 changes: 153 additions & 0 deletions src/analysis/fixpoint_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""
Original Author: gromandiez
Module that serves as the skeleton for fixpoint-based analysis. Used for implementing the liveness analysis to detect
which variables must be reused. Adapted from
https://github.com/costa-group/EthIR/blob/fea70e305801258c3ec50b47e1251237063d3fcd/ethir/analysis/fixpoint_analysis.py
"""

import logging
from analysis.abstract_state import AbstractState, AbstractBlockInfo
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
from parser.cfg_block import CFGBlock

# Relevant types to consider in the algorithm
block_T = AbstractBlockInfo
block_id_T = str
var_T = str
state_T = AbstractState


class BlockAnalysisInfo(ABC):
"""
Class that contains the information needed to manage and propagate the information
for a block in a given analysis. A concrete class has to implement the method
of propagation
"""

# Creates an initial abstract state with the received information
def __init__(self, block_info: block_T, input_state: state_T):
self.block_info: block_T = block_info
self.input_state: state_T = input_state
self.output_state: Optional[state_T] = None

def get_input_state(self) -> state_T:
return self.input_state

def get_output_state(self) -> state_T:
return self.output_state

def revisit_block(self, input_state: state_T):
"""
Evaluates if a block need to be revisited or not
"""
logging.debug("Comparing...")
logging.debug("Current input state: " + str(self))
logging.debug("Compared state: " + str(input_state))
leq = input_state.leq(self.input_state)
logging.debug("Result: " + str(leq))

if leq:
return False

self.input_state.lub(input_state)
return True

@abstractmethod
def propagate_information(self):
"""
For a more efficient implementation, we consider propagate information as an abstract
method. This way, we don't need to create and destroy an output state each time it is updated
"""
raise NotImplementedError

def process_block(self):
# We start with the initial state of the block
current_state = self.input_state
id_block = self.block_info.block_id

logging.debug("Processing " + str(id_block) + " :: " + str(current_state))
self.propagate_information()
logging.debug("Resulting state " + str(id_block) + " :: " + str(self.output_state))

def __repr__(self):
textual_repr = str(self.block_info.block_id) + "." + "Input State: " + str(self.input_state) + \
". Output State: " + str(self.output_state) + "."
return textual_repr


class Analysis:

def __init__(self, vertices: Dict[block_id_T, block_T], initial_block: block_id_T, initial_state: state_T,
analysis_info_constructor):
self.vertices = vertices
self.pending = [initial_block]
self.constructor = analysis_info_constructor

# Info from the analysis for each block
self.blocks_info: Dict[block_id_T, BlockAnalysisInfo] = {initial_block: analysis_info_constructor(vertices[initial_block], initial_state)}

def analyze(self):
while len(self.pending) > 0:
block_id = self.pending.pop()

# Process the block
block_info = self.blocks_info[block_id]

block_info.process_block()

# Returns the output state of the corresponding block
output_state = block_info.get_output_state()

# Propagates the information
self.process_jumps(block_id, output_state)

def process_jumps(self, block_id: block_id_T, input_state: state_T):
"""
Propagates the information according to the blocks and the input state
"""
logging.debug("Process JUMPS")
logging.debug("Input State: " + str(input_state))
basic_block = self.vertices[block_id]

if basic_block.block_type == "terminal" or basic_block.block_type == "mainExit":
return

jump_target = basic_block.jumps_to

if jump_target != 0 and jump_target != -1:
if self.blocks_info.get(jump_target) is None:
self.pending.append(jump_target)
# print("************")
# print(block_id)
# print(jump_target)
# print(self.vertices[block_id].display())
self.blocks_info[jump_target] = self.constructor(self.vertices[jump_target], input_state)

elif self.blocks_info.get(jump_target).revisit_block(input_state):
#print("REVISITING BLOCK!!! " + str(jump_target))
self.pending.append(jump_target)

jump_target = basic_block.falls_to

if jump_target is not None:
if self.blocks_info.get(jump_target) is None:
self.pending.append(jump_target)
self.blocks_info[jump_target] = self.constructor(self.vertices[jump_target], input_state)

elif self.blocks_info.get(jump_target).revisit_block(input_state):
self.pending.append(jump_target)

def get_analysis_results(self):
return self.blocks_info

def get_block_results(self, block_id):
if str(block_id).find("_") == -1:
block_id = int(block_id)
return self.blocks_info[block_id]

def __repr__(self):
for id_ in self.blocks_info:
print(str(self.blocks_info[id_]))
return ""
Empty file added src/liveness/__init__.py
Empty file.
42 changes: 42 additions & 0 deletions src/liveness/liveness_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
from typing import Dict
from analysis.fixpoint_analysis import BlockAnalysisInfo, Analysis
from liveness.liveness_state import LivenessState, LivenessBlockInfo
from parser.cfg import CFG


class LivenessAnalysisInfo(BlockAnalysisInfo):
"""
Assumes self.input_stack and self.block_info are updated accordingly
"""

def propagate_information(self):
# If the output state is None, we need to propagate the information from the block and the input state
if self.output_state is None:
output_state = LivenessState()
output_state.live_vars = self.input_state.live_vars.union(self.block_info.propagated_variables)
self.output_state = output_state

# Otherwise, the information from the block is already propagated
else:
self.output_state.lub(self.input_state)


def construct_vertices(cfg: CFG):
# TODO: decide how to construct the vertices for each subobject
for object_id, subobject in cfg.objectCFG.items():
return {block_id: LivenessBlockInfo(block) for block_id, block in subobject.blocks.items()}


def liveness_analysis_from_vertices(vertices: Dict[str, LivenessBlockInfo], initial_block: str) -> Analysis:
liveness_analysis = Analysis(vertices, initial_block, LivenessState(), LivenessAnalysisInfo)
liveness_analysis.analyze()
return liveness_analysis


def perform_liveness_analysis(cfg: CFG):
logging.debug("Start analysis...")
vertices = construct_vertices(cfg)
logging.debug("Start analysis...")
liveness_analysis = liveness_analysis_from_vertices(vertices, "Block0")
return liveness_analysis.get_analysis_results()
75 changes: 75 additions & 0 deletions src/liveness/liveness_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Module that implements the state needed to perform the liveness analysis
"""

from analysis.abstract_state import AbstractState, AbstractBlockInfo
from parser.cfg_block import CFGBlock, CFGInstruction
from typing import Dict, List, Tuple, Set, Any

var_T = str


def _uses_defines_from_instructions(instructions: List[CFGInstruction]) -> Tuple[Set[var_T], Set[var_T]]:
"""
Generates uses and defines sets with the variables that are used and defined in the set of instructions, resp.
"""
uses, defines = set(), set()
for instruction in instructions:
uses.update(instruction.in_args)
defines.update(instruction.out_args)
return uses, defines


class LivenessBlockInfo(AbstractBlockInfo):

def __init__(self, basic_block: CFGBlock):
super().__init__()
self._id = basic_block.block_id
self._jumps_to = basic_block.get_jump_to()
self._falls_to = basic_block.get_falls_to()
self._block_type = basic_block.get_jump_type()
self.uses, self.defines = _uses_defines_from_instructions(basic_block.get_instructions())

# Variables that need to be propagated
self.propagated_variables = self.uses.difference(self.defines)

@property
def block_id(self) -> Any:
return self._id

@property
def jumps_to(self) -> Any:
return self._jumps_to

@property
def falls_to(self) -> Any:
return self._jumps_to

@property
def block_type(self) -> Any:
return self._block_type

def __repr__(self):
text_repr = [f"Block id: {self._id}", f"Block type: {self.block_type}",
f"Jumps to: {self.jumps_to}", f"Falls to: {self.falls_to}",
f"Propagated variables: {self.propagated_variables}"]
return '\n'.join(text_repr)


class LivenessState(AbstractState):
"""
Records the liveness state from a given block
"""

def __init__(self):
super().__init__()
self.live_vars = set()

def lub(self, state: 'LivenessState') -> None:
self.live_vars.union(state.live_vars)

def leq(self, state: 'LivenessState') -> bool:
return self.live_vars.issubset(state.live_vars)

def __repr__(self):
return str(self.live_vars)
8 changes: 2 additions & 6 deletions src/parser/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,29 @@ def store_sfs_json(blocks: List[Dict], final_path: Path) -> None:
with open(file_to_store, 'w') as f:
json.dump(block, f, indent=4)


class CFG:
def __init__(self, file_name: str, nodeType : str):
self.file_name = file_name
def __init__(self, nodeType: str):
self.nodeType = nodeType
self.objectCFG : Dict[str, CFGObject] = {}
self.subObjects = {}

def add_subobjects(self, subobjects):
self.subObjects = subobjects


def add_object(self, name:str, cfg_object: CFGObject) -> None:
self.objectCFG[name] = cfg_object


def get_object(self, name:str) -> CFGObject:
return self.objectCFG[name]


def build_spec_for_objects(self):
object_dict = {}
for o in self.objectCFG:
specs = self.objectCFG[o].build_spec_for_blocks()
object_dict[o] = specs
return object_dict


def get_as_json(self):
json_cfg = {}
json_cfg["nodeType"] = self.nodeType
Expand Down
Loading

0 comments on commit 5d4096a

Please sign in to comment.