from csdl_alpha.src.graph.graph import Graph
from csdl_alpha.utils.inputs import get_type_string
import numpy as np
from typing import Union
[docs]class Recorder:
"""
The Recorder class assembles CSDL variables and operations into a computational graph.
Attributes
----------
manager : Manager
The global manager object that manages the recorders.
active_graph_node : Tree
The currently active graph node.
active_namespace : Namespace
The currently active namespace node.
active_graph : Graph
The currently active graph.
"""
def __init__(self,
inline: bool = False,
debug: bool = False,
expand_ops: bool = False,
auto_hierarchy: bool = False):
"""
Initializes a Recorder object.
Parameters
----------
inline : bool, optional
Specifies whether to run inline evaluations, by default False.
debug : bool, optional
Specifies whether to enable debug mode, by default False.
expand_ops : bool, optional
Specifies whether to expand composed operations, by default False.
auto_hierarchy : bool, optional
Specifies whether to automatically create a hierarchy, by default False.
"""
from csdl_alpha.api import manager
from csdl_alpha.src.graph.variable import Variable
self.manager = manager
self.inline = inline
self.debug = debug
self.expand_ops = expand_ops
self.auto_hierarchy = auto_hierarchy
self.hierarchy = 0
# keep track of inline stuff for loops
self._in_loop = False
self._reset_loops = False
self.design_variables:dict[Variable, tuple[np.array, np.array, np.array]] = {}
self.constraints:dict[Variable, tuple[np.array, np.array, np.array]] = {}
self.objectives:dict[Variable, np.array] = {}
self.namespace_tree = Namespace(None)
self.active_namespace = self.namespace_tree
# TODO: unbloat...
self.graph_tree = Tree(Graph(name = 'root'))
self.active_graph:Graph = self.graph_tree.value
self.node_graph_map = {}
self.active_graph_stack = [self.active_graph]
self.graph_to_tree_node_map = {
self.active_graph: self.graph_tree
}
manager.constructed_recorders.append(self)
[docs] def start(self):
"""
Activates the recorder.
"""
self.manager.activate_recorder(self)
[docs] def stop(self):
"""
Deactivates the recorder.
"""
self.manager.deactivate_recorder(self)
[docs] def execute(self):
"""
Executes the current active graph inline and updates all variable values
"""
#TODO: TEST TEST TEST TEST
self.active_graph.execute_inline()
def _find_variables_by_name(self, name:str)->list:
"""
!!!UNTESTED!!!
Finds a variable by name and returns them
Args:
name: The name of the variable to find.
Returns:
The variable with the given name.
"""
from csdl_alpha.src.graph.variable import Variable
matched_names:list[Variable] = []
for node in self.active_graph.node_table:
if isinstance(node, Variable):
if name == node.name:
matched_names.append(node)
else:
for node_name in node.names:
if node_name == name:
matched_names.append(node)
if len(matched_names) == 0:
raise KeyError(f"No variable with name \'{name}\' found")
if len(matched_names) > 1:
raise KeyError(f"Multiple variables with name \'{name}\' found")
return matched_names
[docs] def find_variable_by_name(self, *names:str)->'Variable':
"""Given strings, returns the variables with the given names.
Parameters
----------
names : str
The names of the variables to find.
Returns
-------
tuple[Variable]
The variables with the given names in the order they were provided.
Raises
------
KeyError
If no variable with a given name is found.
KeyError
If multiple variables with the same given name are found.
TypeError
If strings not given.
"""
for name in names:
if not isinstance(name, str):
raise TypeError(f"Names must be of type 'string'. Got type {get_type_string(name)}")
matched_names = [self._find_variables_by_name(name)[0] for name in names]
if len(matched_names) == 1:
return matched_names[0]
return tuple(matched_names)
[docs] def gather_insights(self)->dict[str, Union[dict, set]]:
"""
UNTESTED!
"""
#TODO: TEST TEST TEST TEST
from csdl_alpha.src.operations.operation_subclasses import SubgraphOperation
from csdl_alpha.src.graph.variable import Variable
information_dict = {}
information_dict['names2nodes'] = {}
information_dict['nodes2graphs'] = {}
information_dict['input_nodes'] = set()
information_dict['graph_tree'] = {}
information_dict['analytics'] = {
'number of nodes': 0,
'number of edges': 0,
'number of variables': 0,
'number of operations': 0,
'number of namespaces': 0,
'number of graphs': 0,
}
all_nodes = set()
root_graph = self.get_root_graph()
graphs_to_process = [root_graph]
while len(graphs_to_process) > 0:
# current graph
current_graph = graphs_to_process.pop(0)
current_num_ops = 0
# initialize graph tree and analytics
information_dict['graph_tree'][current_graph] = []
information_dict['analytics']['number of graphs'] += 1
# iterate over all nodes in the current graph and store information
for node in current_graph.node_table:
information_dict['analytics']['number of nodes'] += 1
if isinstance(node, Variable):
# store information about the variable once
if node not in all_nodes:
for name in node.names:
information_dict['names2nodes'][name] = node
information_dict['analytics']['number of variables'] += 1
all_nodes.add(node)
# track all input nodes of the RECORDER (not all subgraphs)
if current_graph is root_graph:
if len(current_graph.predecessors(node)) == 0:
information_dict['input_nodes'].add(node)
else:
information_dict['analytics']['number of operations'] += 1
if isinstance(node, SubgraphOperation):
subgraph = node.get_subgraph()
graphs_to_process.append(subgraph)
information_dict['graph_tree'][current_graph].append((node, subgraph))
else:
current_num_ops += 1
# track all graphs that the node is in
if node not in information_dict['nodes2graphs']:
information_dict['nodes2graphs'][node] = [current_graph]
else:
information_dict['nodes2graphs'][node].append(current_graph)
information_dict['graph_tree'][current_graph].append((None, f'(+{current_num_ops} ops)'))
return information_dict
[docs] def count_origins(self, n=10, mode='function', type='operation'):
"""Count the origins of nodes in the active graph and prints to console.
This method counts the origins of nodes in the active graph based on the specified mode and type.
The origins can be counted based on the function, file, or line where the node originated from.
Parameters
----------
n : int, optional
The number of origins to display, by default 10.
mode : str, optional
The mode to count the origins. Valid options are 'function', 'file', or 'line', by default 'function'.
type : str | object, optional
The type of nodes to count the origins. Valid options are 'operation', 'variable', or 'all', by default 'operation'.
Raises
------
ValueError
If an invalid mode or type is provided.
"""
import csdl_alpha as csdl
from csdl_alpha.src.graph.operation import Operation
# TODO: allow users to specify specific operations to count
if type == 'operation':
types = (Operation)
elif type == 'variable':
types = (csdl.Variable)
elif type == 'all':
types = (Operation, csdl.Variable)
else:
raise ValueError(f"Invalid type: {type}")
origin_counts = {}
# for node in self.active_graph.node_table:
for node in self.node_graph_map:
if isinstance(node, types):
if mode == 'function':
origin = f"{node.origin_info['function']} in {node.origin_info['filename']}"
elif mode == 'file':
origin = node.origin_info['filename']
elif mode == 'line':
origin = f"{node.origin_info['filename']}:{node.origin_info['lineno']}"
else:
raise ValueError(f"Invalid mode: {mode}")
if origin in origin_counts:
origin_counts[origin] += 1
else:
origin_counts[origin] = 1
sorted_origins = sorted(origin_counts.items(), key=lambda x: x[1], reverse=True)
for i, (origin, count) in enumerate(sorted_origins):
if i >= n:
break
print(f'{origin} : {count}')
[docs] def count_operations(self, n=10):
"""
Prints the most common operations in the graph.
Args:
n: The number of operations to print.
"""
from csdl_alpha.src.graph.operation import Operation
operation_counts = {}
for node in self.active_graph.node_table:
if isinstance(node, Operation):
if node.__class__.__name__ in operation_counts:
operation_counts[node.__class__.__name__] += 1
else:
operation_counts[node.__class__.__name__] = 1
operation_counts = sorted(operation_counts.items(), key=lambda x: x[1], reverse=True)
for i, (operation, count) in enumerate(operation_counts):
if i >= n:
break
print(f'{operation} : {count}')
[docs] def print_largest_variables(self, n=10):
"""
Prints the largest variables in the graph.
Args:
n: The number of variables to print.
"""
from csdl_alpha.src.graph.variable import Variable
variable_sizes = {}
for node in self.active_graph.node_table:
if isinstance(node, Variable):
variable_sizes[node] = np.prod(node.shape)
variable_sizes = sorted(variable_sizes.items(), key=lambda x: x[1], reverse=True)
for i, (variable, size) in enumerate(variable_sizes):
if i >= n:
break
print(f'{variable.info()} : {variable.shape}')
[docs] def print_graph_structure(self):
"""
prints graph tree structure like:
root
graph1
graph2
graph3
graph4
graph5
"""
#TODO: TEST TEST TEST TEST
graph_tree = self.gather_insights()['graph_tree']
# from https://stackoverflow.com/questions/51903172/how-to-display-a-tree-in-python-similar-to-msdos-tree-command
def ptree(parent, tree, indent=''):
if isinstance(parent, str):
print(parent)
else:
print(parent.name)
if parent not in tree:
return
indent += ' '
for child in tree[parent][:-1]:
print(indent + '|' + '-' * 4, end='')
ptree(child[1], tree, indent + '|' + ' ' * 4)
if len(tree[parent]) > 0:
child = tree[parent][-1]
print(indent + '`' + '-' * 4, end='')
ptree(child[1], tree, indent + ' ' * 4)
ptree(self.get_root_graph(), graph_tree)
def _enter_namespace(self, name: str):
"""
Enters a new namespace.
Arguments
---------
name: The name of the namespace to enter.
"""
if not isinstance(name, str):
raise TypeError("Name of namespace is not a string")
if name in self.active_namespace.child_names:
raise Exception("Attempting to enter existing namespace")
self.hierarchy += 1
self.active_namespace.child_names.add(name)
if self.active_namespace.name is None:
prepend = name
else:
prepend = self.active_namespace.prepend + '.' + name
self.active_namespace = self.active_namespace.add_child(name, prepend=prepend)
def _exit_namespace(self):
"""
Exits the current namespace.
"""
if self.active_namespace.parent is None:
raise Exception("Attempting to exit root namespace")
self.hierarchy -= 1
self.active_namespace = self.active_namespace.parent
self.active_namespace = self.active_namespace
def _enter_subgraph(
self,
add_missing_variables: bool = False,
name:str = None,
graph:Graph = None,
):
"""
Enters a new subgraph.
"""
#TODO: TEST TEST TEST TEST
if not isinstance(add_missing_variables, bool):
raise TypeError(f"add_missing_variables must be a boolean. Got {get_type_string(add_missing_variables)}")
if name is not None and not isinstance(name, str):
raise TypeError(f"name must be a string. Got {get_type_string(name)}")
if graph is not None and not isinstance(graph, Graph):
raise TypeError(f"graph must be a Graph object. Got {get_type_string(graph)}")
if graph is None:
# Add new graph to tree
active_graph_node = self.graph_to_tree_node_map[self.active_graph].add_child(Graph(name = name))
self.active_graph = active_graph_node.value
# Add new graph to graph to tree node map
self.graph_to_tree_node_map[self.active_graph] = active_graph_node
self.active_graph_stack.append(self.active_graph)
self.active_graph.add_missing_variables = add_missing_variables
self.active_graph.inputs = []
else:
self.active_graph = graph
self.active_graph_stack.append(self.active_graph)
def _exit_subgraph(self):
"""
Exits the current subgraph.
"""
self.active_graph_stack.pop()
self.active_graph = self.active_graph_stack[-1]
# self.active_graph_node = self.active_graph_node.parent
# self.active_graph = self.active_graph_node.value
def _add_node(self, node):
"""
Adds a node to the active namespace and graph.
Args:
node: The node to add.
"""
self.active_graph.add_node(node)
self.node_graph_map[node] = [self.active_graph]
def _set_namespace(self, node):
"""
sets namespace of node.
"""
from csdl_alpha.src.graph.variable import Variable
self.active_namespace.nodes.append(node)
node.namespace = self.active_namespace
if self.auto_hierarchy and isinstance(node, Variable):
node.set_hierarchy(self.hierarchy)
def _add_edge(self, node_from, node_to):
"""
Adds an edge between two nodes in the active graph.
Args:
node_from: The source node.
node_to: The target node.
"""
from csdl_alpha.src.graph.variable import Variable
graph = self.active_graph
if node_from not in graph.node_table: # TODO: consider changing node_graph_map to reflect this
if graph.add_missing_variables and isinstance(node_from, Variable):
graph.add_node(node_from)
if not node_from in graph.inputs: graph.inputs.append(node_from)
else:
raise ValueError(f"Node {node_from.info()} not in graph")
if node_to not in graph.node_table:
# if graph.add_missing_variables and isinstance(node_to, Variable):
# graph.add_node(node_to)
# else:
# raise ValueError(f"Node {node_to} not in graph")
raise ValueError(f"Node {node_to.info()} not in graph")
graph.add_edge(node_from, node_to)
def _add_design_variable(self, variable, upper, lower, scaler, adder):
"""
Adds a design variable to the recorder.
Args:
variable: The design variable.
upper: The upper bound of the design variable.
lower: The lower bound of the design variable.
scaler: The scaler value of the design variable.
adder: The adder value of the design variable.
"""
self.design_variables[variable] = (scaler, lower, upper, adder)
def _add_constraint(self, variable, upper, lower, scaler, adder):
"""
Adds a constraint to the recorder.
Args:
variable: The constraint variable.
upper: The upper bound of the constraint.
lower: The lower bound of the constraint.
scaler: The scaler value of the constraint.
adder: The adder value of the constraint.
"""
self.constraints[variable] = (scaler, lower, upper, adder)
def _add_objective(self, variable, scaler, adder):
"""
Adds an objective to the recorder.
Args:
variable: The objective variable.
scaler: The scaler value of the objective.
adder: The adder value of the objective.
"""
if len(self.objectives) > 0:
raise ValueError("Objective has already been set")
self.objectives[variable] = (scaler, adder)
def _delete_current_graph(self):
"""
Deletes the current graph.
"""
current_graph_node = self.active_graph_node
parent_graph_node = current_graph_node.parent
parent_graph_node.children.remove(current_graph_node)
self.active_graph_node = parent_graph_node
self.active_graph = parent_graph_node.value
[docs] def visualize_graph(
self,
filename: str = 'image',
visualize_style: str = 'namespace',
trim_loops = False,
format = 'svg',
)->None:
"""
Visualizes the graph.
- 'namespace' visualizes the top-level graph where nodes are grouped by namespace
- 'hierarchical' visualizes all graphs including subgraph operations for debugging. Always saves as a .svg file.
Parameters
----------
filename : str, optional
The filename to save the visualization to, by default 'image'
visualize_style : str, optional
The style of visualization, by default 'namespace'
trim_loops : bool, optional
Whether to trim loops, by default False
format : str, optional
The format of the visualization, by default 'svg'
"""
if visualize_style == 'namespace':
self.active_graph.visualize(filename, trim_loops=trim_loops, format = format)
elif visualize_style == 'hierarchical':
self.visualize_hierarchical(filename)
else:
raise ValueError(f"Invalid visualize_style: {visualize_style}. Must be 'namespace' or 'hierarchical'")
def visualize_hierarchical(self, filename):
import pydot
# Get the graph tree structure
dot = pydot.Dot(graph_type='digraph')
insights = self.gather_insights()
# utility functions for naming nodes
def get_raw_node_string(node):
return str(node).split()[-1][:-1]
def build_unique_node_name(node, parent):
node_id = get_raw_node_string(node)
parent_id = get_raw_node_string(parent)
return f'{str(node_id)}_{str(parent_id)}'
def name_single_node(node):
return f'{get_raw_node_string(node)}\n{node.name}'
def name_node(
node,
color:str = None,
additional_label:str = None,):
from csdl_alpha.src.graph.variable import Variable
attr_dict = {}
label = name_single_node(node)
if additional_label is not None:
label += f'\n{additional_label}'
attr_dict['label'] = label
if isinstance(node, Variable):
attr_dict['shape'] = 'ellipse'
if color is not None:
attr_dict['fillcolor'] = color
attr_dict['style'] = 'filled'
if node.value is not None:
attr_dict['tooltip'] = f'{node.info()}\n{np.min(node.value):.3e}, {np.max(node.value):.3e}, {np.mean(node.value):.3e}, {node.shape}'
else:
attr_dict['tooltip'] = f'{node.info()}\n{node.value}'
else:
attr_dict['tooltip'] = f'{node.info()}'
attr_dict['shape'] = 'rectangle'
return attr_dict
# Traverse the graph hierarchy and plot each subgraph to a cluster
# TODO: How do we actually create the tree structure?
next_tree_nodes = [(None, self.get_root_graph(), build_unique_node_name(None, None))]
while len(next_tree_nodes) > 0:
popped = next_tree_nodes.pop(0)
g = popped[1]
parent_op = popped[0]
parent_op_name = popped[2]
if parent_op is None:
cluster_name = 'root'
else:
cluster_name = name_single_node(parent_op)
cluster = pydot.Cluster(str(parent_op), label=cluster_name)
for node in g.node_table:
node_name = build_unique_node_name(node, parent_op)
dot_node = pydot.Node(node_name, **name_node(node))
cluster.add_node(dot_node)
# Add upstream edges to the dot graph
node_index = g.node_table[node]
for pred in g.rxgraph.predecessors(node_index):
pred_name = build_unique_node_name(pred, parent_op)
edge = pydot.Edge(pred_name, node_name)
dot.add_edge(edge)
# if g.in_degree(node) == 0:
# edge = pydot.Edge(parent_op_name, node_name)
# dot.add_edge(edge)
from csdl_alpha.src.operations.loops.loop import Loop
if isinstance(parent_op, Loop):
for loop_var in parent_op.loop_vars:
body_input = build_unique_node_name(loop_var[0], parent_op)
body_output = build_unique_node_name(loop_var[2], parent_op)
edge = pydot.Edge(body_output, body_input, color="blue", style = "dashed")
dot.add_edge(edge)
from csdl_alpha.src.operations.loops.new_loop.new_loop import NewLoop
if isinstance(parent_op, NewLoop):
parent_op:NewLoop = parent_op
for feedback in parent_op.loop_builder.feedbacks._int_input_to_feedback.values():
body_input = build_unique_node_name(feedback.internal_input, parent_op)
body_output = build_unique_node_name(feedback.output, parent_op)
edge = pydot.Edge(body_output, body_input, color="blue", style = "dashed")
dot.add_edge(edge)
for input in parent_op.loop_builder.inputs:
node_name = build_unique_node_name(input, parent_op)
dot_node = pydot.Node(node_name, **name_node(input, color = '#e6f7ff'))
cluster.add_node(dot_node)
for output in parent_op.loop_builder.outputs:
node_name = build_unique_node_name(output, parent_op)
dot_node = pydot.Node(node_name, **name_node(output, color = '#ffe6e6'))
cluster.add_node(dot_node)
for accrued in parent_op.loop_builder.accrued:
node_name = build_unique_node_name(accrued, parent_op)
dot_node = pydot.Node(node_name, **name_node(accrued, color = '#ffe6e6', additional_label = '(accrued)'))
cluster.add_node(dot_node)
for stack in parent_op.loop_builder.stacked:
node_name = build_unique_node_name(stack, parent_op)
dot_node = pydot.Node(node_name, **name_node(stack, color = '#ffe6e6', additional_label = '(stacked)'))
cluster.add_node(dot_node)
for child in insights['graph_tree'][g]:
if isinstance(child[1], Graph):
parent_op_name = build_unique_node_name(child[0], parent_op)
next_tree_nodes.append((*child, parent_op_name))
dot.add_subgraph(cluster)
dot.write_svg(f'{filename}.svg')
[docs] def visualize_adjacency_matrix(self):
"""
Visualizes the adjacency matrix of the graph.
"""
self.active_graph.visualize_n2()
[docs] def save_graph(self, filename: str = 'graph'):
"""Saves the graph to file
Parameters
----------
filename : str, optional
filename to save to, by default 'graph'
"""
self.active_graph.save(filename)
[docs] def get_root_graph(self):
"""
Gets the root graph.
Returns:
The root graph.
"""
#TODO: TEST TEST TEST TEST
return self.graph_tree.value
class Tree:
"""
Represents a tree data structure.
Attributes:
value: The value stored in the tree node.
parent: The parent node of the current node.
children: The list of child nodes of the current node.
"""
def __init__(self, value, parent=None):
"""
Initializes a new instance of the Tree class.
Args:
value: The value stored in the tree node.
parent: The parent node of the current node.
"""
self.value = value
self.children = []
self.parent = parent
def add_child(self, value):
"""
Adds a child node to the current node.
Args:
value: The value to be stored in the child node.
Returns:
The newly created child node.
"""
child = Tree(value, parent=self)
self.children.append(child)
return child
class Namespace(Tree):
"""
Represents a namespace.
Attributes:
name: The name of the namespace.
nodes: The list of nodes in the namespace.
prepend: The string to prepend to the namespace name.
"""
def __init__(self, name, nodes=[], prepend=None, parent=None):
"""
Initializes a new instance of the Namespace class.
Args:
name: The name of the namespace.
nodes: The list of nodes in the namespace.
prepend: The string to prepend to the namespace name.
"""
self.name = name
self.nodes = nodes
self.prepend = prepend
if prepend is None:
self.prepend = name
self.children = []
self.parent = parent
self.child_names = set()
def add_child(self, name, nodes=[], prepend=None):
"""
Adds a child namespace to the current namespace.
Args:
name: The name of the child namespace.
nodes: The list of nodes in the child namespace.
prepend: The string to prepend to the child namespace name.
Returns:
The newly created child namespace.
"""
child = Namespace(name, nodes, prepend, parent=self)
self.children.append(child)
return child