From d25223d65b6259a71ce734dc4a431a64eec68f6f Mon Sep 17 00:00:00 2001 From: Peyton Date: Sat, 19 Aug 2023 17:27:42 -0500 Subject: [PATCH 01/10] First draft of major code refactoring and adding support for custom nodes. --- comfyui_to_python.py | 490 ++++++++++++++++++++----------------------- utils.py | 150 +++++++++++++ 2 files changed, 383 insertions(+), 257 deletions(-) create mode 100644 utils.py diff --git a/comfyui_to_python.py b/comfyui_to_python.py index 6a5ba4c..cb99127 100644 --- a/comfyui_to_python.py +++ b/comfyui_to_python.py @@ -1,6 +1,5 @@ -import glob +import copy import inspect -import json import logging import os import sys @@ -9,6 +8,9 @@ from typing import Dict, List, Any, Callable +from utils import import_custom_nodes, read_json_file, write_code_to_file, add_comfyui_directories_to_sys_path + + sys.path.append('../') from nodes import NODE_CLASS_MAPPINGS @@ -16,318 +18,292 @@ logging.basicConfig(level=logging.INFO) - -def read_json_file(file_path: str) -> dict: - """ - Reads a JSON file and returns its contents as a dictionary. - - Args: - file_path (str): The path to the JSON file. - - Returns: - dict: The contents of the JSON file as a dictionary. - - Raises: - FileNotFoundError: If the file is not found, it lists all JSON files in the directory of the file path. - ValueError: If the file is not a valid JSON. - """ - - try: - with open(file_path, 'r') as file: - data = json.load(file) - return data - - except FileNotFoundError: - # Get the directory from the file_path - directory = os.path.dirname(file_path) - - # If the directory is an empty string (which means file is in the current directory), - # get the current working directory - if not directory: - directory = os.getcwd() - - # Find all JSON files in the directory - json_files = glob.glob(f"{directory}/*.json") - - # Format the list of JSON files as a string - json_files_str = "\n".join(json_files) - - raise FileNotFoundError(f"\n\nFile not found: {file_path}. JSON files in the directory:\n{json_files_str}") - - except json.JSONDecodeError: - raise ValueError(f"Invalid JSON format in file: {file_path}") -def determine_load_order(data: Dict) -> List: - """ - Determine the load order of each key in the provided dictionary. This code will place the - nodes without node dependencies first, then ensure that any node whose result is used - in another node will be added to the list in the order it should be executed. - - Args: - data (Dict): - The dictionary for which to determine the load order. - - Returns: - List: - A list of tuples where each tuple contains a key, its corresponding dictionary, - and a boolean indicating whether or not the function is dependent on the output of - a previous function, ordered by load order. - """ - - # Create a dictionary to keep track of visited nodes. - visited = {} - # Create a list to store the load order for functions - load_order = [] - # Boolean to indicate whether or not the class is a loader class that should not be - # reloaded during every loop - is_loader = False - - def dfs(key: str) -> None: +class ComfyUItoPython: + def __init__(self, input='workflow_api.json', queue_size=10): + self.input = input + self.queue_size = queue_size + self.BASE_NODE_CLASS_MAPPINGS = copy.deepcopy(NODE_CLASS_MAPPINGS) + self.generate_code() + + def generate_code(self): + """ + Main function to be executed. + """ + import_custom_nodes() + # Load JSON data from the input file + prompt = read_json_file(self.input) + load_order = self.determine_load_order(prompt) + output_file = self.input.replace('.json', '.py') + code = self.generate_workflow(load_order, filename=output_file, queue_size=self.queue_size) + logging.info(code) + + def determine_load_order(self, data: Dict) -> List: """ - Depth-First Search function. + Determine the load order of each key in the provided dictionary. This code will place the + nodes without node dependencies first, then ensure that any node whose result is used + in another node will be added to the list in the order it should be executed. Args: - key (str): The key from which to start the DFS. + data (Dict): + The dictionary for which to determine the load order. Returns: - None + List: + A list of tuples where each tuple contains a key, its corresponding dictionary, + and a boolean indicating whether or not the function is dependent on the output of + a previous function, ordered by load order. """ - # Mark the node as visited. - visited[key] = True - inputs = data[key]['inputs'] - - # Loop over each input key. - for input_key, val in inputs.items(): - # If the value is a list and the first item in the list (which should be a key) - # has not been visited yet, then recursively apply dfs on the dependency. - if isinstance(val, list) and val[0] not in visited: - dfs(val[0]) - - # Add the key and its corresponding data to the load order list. - load_order.append((key, data[key], is_loader)) - - # Load Loader keys first - for key in data: - class_def = NODE_CLASS_MAPPINGS[data[key]['class_type']]() - if class_def.CATEGORY == 'loaders' or class_def.FUNCTION in ['encode'] or not any(isinstance(val, list) for val in data[key]['inputs'].values()): - is_loader = True + + # Create a dictionary to keep track of visited nodes. + visited = {} + # Create a list to store the load order for functions + load_order = [] + # Boolean to indicate whether or not the class is a loader class that should not be + # reloaded during every loop + is_loader = False + + def dfs(key: str) -> None: + """ + Depth-First Search function. + + Args: + key (str): The key from which to start the DFS. + + Returns: + None + """ + # Mark the node as visited. + visited[key] = True + inputs = data[key]['inputs'] + + # Loop over each input key. + for input_key, val in inputs.items(): + # If the value is a list and the first item in the list (which should be a key) + # has not been visited yet, then recursively apply dfs on the dependency. + if isinstance(val, list) and val[0] not in visited: + dfs(val[0]) + + # Add the key and its corresponding data to the load order list. + load_order.append((key, data[key], is_loader)) + + # Load Loader keys first + for key in data: + class_def = NODE_CLASS_MAPPINGS[data[key]['class_type']]() + if class_def.CATEGORY == 'loaders' or class_def.FUNCTION in ['encode'] or not any(isinstance(val, list) for val in data[key]['inputs'].values()): + is_loader = True + # If the key has not been visited, perform a DFS from that key. + if key not in visited: + dfs(key) + + # Reset is_loader bool + is_loader = False + # Loop over each key in the data. + for key in data: # If the key has not been visited, perform a DFS from that key. if key not in visited: dfs(key) - # Reset is_loader bool - is_loader = False - # Loop over each key in the data. - for key in data: - # If the key has not been visited, perform a DFS from that key. - if key not in visited: - dfs(key) - - return load_order - + return load_order -def create_function_call_code(obj_name: str, func: str, variable_name: str, is_loader: bool, **kwargs) -> str: - """ - This function generates Python code for a function call. - Args: - obj_name (str): The name of the initialized object. - func (str): The function to be called. - variable_name (str): The name of the variable that the function result should be assigned to. - is_loader (bool): Determines the code indentation. - **kwargs: The keyword arguments for the function. - - Returns: - str: The generated Python code. - """ - - def format_arg(key: str, value: any) -> str: - """Formats arguments based on key and value.""" - if key == 'noise_seed': - return f'{key}=random.randint(1, 2**64)' - elif isinstance(value, str): - value = value.replace("\n", "\\n") - return f'{key}="{value}"' - elif isinstance(value, dict) and 'variable_name' in value: - return f'{key}={value["variable_name"]}' - return f'{key}={value}' - - args = ', '.join(format_arg(key, value) for key, value in kwargs.items()) - - # Generate the Python code - code = f'{variable_name} = {obj_name}.{func}({args})\n' - - # If the code contains dependencies, indent the code because it will be placed inside - # of a for loop - if not is_loader: - code = f'\t{code}' + def create_function_call_code(self, obj_name: str, func: str, variable_name: str, is_loader: bool, **kwargs) -> str: + """ + This function generates Python code for a function call. - return code + Args: + obj_name (str): The name of the initialized object. + func (str): The function to be called. + variable_name (str): The name of the variable that the function result should be assigned to. + is_loader (bool): Determines the code indentation. + **kwargs: The keyword arguments for the function. + Returns: + str: The generated Python code. + """ -def update_inputs(inputs: Dict, executed_variables: Dict) -> Dict: - """ - Update inputs based on the executed variables. + def format_arg(key: str, value: any) -> str: + """Formats arguments based on key and value.""" + if key == 'noise_seed' or key == 'seed': + return f'{key}=random.randint(1, 2**64)' + elif isinstance(value, str): + value = value.replace("\n", "\\n").replace('"', "'") + return f'{key}="{value}"' + elif isinstance(value, dict) and 'variable_name' in value: + return f'{key}={value["variable_name"]}' + return f'{key}={value}' - Args: - inputs (Dict): Inputs dictionary to update. - executed_variables (Dict): Dictionary storing executed variable names. + args = ', '.join(format_arg(key, value) for key, value in kwargs.items()) - Returns: - Dict: Updated inputs dictionary. - """ - for key in inputs.keys(): - if isinstance(inputs[key], list) and inputs[key][0] in executed_variables.keys(): - inputs[key] = {'variable_name': f"{executed_variables[inputs[key][0]]}[{inputs[key][1]}]"} - return inputs + # Generate the Python code + code = f'{variable_name} = {obj_name}.{func}({args})\n' + # If the code contains dependencies, indent the code because it will be placed inside + # of a for loop + if not is_loader: + code = f'\t{code}' -def get_class_info(class_type: str) -> (str, str, str): - """ - Generates and returns necessary information about class type. + return code - Args: - class_type (str): Class type - Returns: - class_type (str): Updated class type - import_statement (str): Import statement string - class_code (str): Class initialization code - """ - import_statement = class_type - class_code = f'{class_type.lower()} = {class_type}()' + def update_inputs(self, inputs: Dict, executed_variables: Dict) -> Dict: + """ + Update inputs based on the executed variables. - return class_type, import_statement, class_code + Args: + inputs (Dict): Inputs dictionary to update. + executed_variables (Dict): Dictionary storing executed variable names. + Returns: + Dict: Updated inputs dictionary. + """ + for key in inputs.keys(): + if isinstance(inputs[key], list) and inputs[key][0] in executed_variables.keys(): + inputs[key] = {'variable_name': f"get_value_at_index({executed_variables[inputs[key][0]]}, {inputs[key][1]})"} + return inputs -def assemble_python_code(import_statements: set, loader_code: List[str], code: List[str], queue_size: int) -> str: - """ - Generates final code string. - Args: - import_statements (set): A set of unique import statements - code (List[str]): A list of code strings - queue_size (int): Number of photos that will be generated by the script. + def get_class_info(self, class_type: str) -> (str, str, str): + """ + Generates and returns necessary information about class type. - Returns: - final_code (str): Generated final code as a string - """ - static_imports = ['import random', 'import torch'] - imports_code = [f"from nodes import {', '.join([class_name for class_name in import_statements])}" ] - main_function_code = f"def main():\n\t" + 'with torch.inference_mode():\n\t\t' + '\n\t\t'.join(loader_code) + f'\n\n\t\tfor q in range({queue_size}):\n\t\t' + '\n\t\t'.join(code) - final_code = '\n'.join(static_imports + ['import sys\nsys.path.append("../")'] + imports_code + ['', main_function_code, '', 'if __name__ == "__main__":', '\tmain()']) - final_code = black.format_str(final_code, mode=black.Mode()) + Args: + class_type (str): Class type - return final_code + Returns: + class_type (str): Updated class type + import_statement (str): Import statement string + class_code (str): Class initialization code + """ + import_statement = class_type + if class_type in self.BASE_NODE_CLASS_MAPPINGS.keys(): + class_code = f'{class_type.lower()} = {class_type}()' + else: + class_code = f'{class_type.lower()} = NODE_CLASS_MAPPINGS["{class_type}"]()' + return class_type, import_statement, class_code -def write_code_to_file(filename: str, code: str) -> None: - """ - Writes given code to a .py file. If the directory does not exist, it creates it. - Args: - filename (str): The name of the Python file to save the code to. - code (str): The code to save. - """ + def assemble_python_code(self, import_statements: set, loader_code: List[str], code: List[str], queue_size: int, custom_nodes=False) -> str: + """ + Generates final code string. - # Extract directory from the filename - directory = os.path.dirname(filename) + Args: + import_statements (set): A set of unique import statements + code (List[str]): A list of code strings + queue_size (int): Number of photos that will be generated by the script. + custom_nodes (bool): Whether to include custom nodes in the code. - # If the directory does not exist, create it - if directory and not os.path.exists(directory): - os.makedirs(directory) + Returns: + final_code (str): Generated final code as a string + """ + # Get the source code of the function as a string + add_comfyui_directories_to_sys_path_code = inspect.getsource(add_comfyui_directories_to_sys_path) + # Define static import statements required for the script + static_imports = ['import os', 'import random', 'import sys', 'import torch', f'\n{add_comfyui_directories_to_sys_path_code}', + '\n\nadd_comfyui_directories_to_sys_path()'] + # Check if custom nodes should be included + if custom_nodes: + static_imports.append('\nfrom utils import import_custom_nodes, get_value_at_index\n') + custom_nodes = 'import_custom_nodes()\n\t' + else: + custom_nodes = '' + # Create import statements for node classes + imports_code = [f"from nodes import {', '.join([class_name for class_name in import_statements])}" ] + # Assemble the main function code, including custom nodes if applicable + main_function_code = "def main():\n\t" + f'{custom_nodes}with torch.inference_mode():\n\t\t' + '\n\t\t'.join(loader_code) \ + + f'\n\n\t\tfor q in range({queue_size}):\n\t\t' + '\n\t\t'.join(code) + # Concatenate all parts to form the final code + final_code = '\n'.join(static_imports + imports_code + ['', main_function_code, '', 'if __name__ == "__main__":', '\tmain()']) + # Format the final code according to PEP 8 using the Black library + final_code = black.format_str(final_code, mode=black.Mode()) - # Save the code to a .py file - with open(filename, 'w') as file: - file.write(code) + return final_code -def get_function_parameters(func: Callable) -> List: - """Get the names of a function's parameters. - Args: - func (Callable): The function whose parameters we want to inspect. + def get_function_parameters(self, func: Callable) -> List: + """Get the names of a function's parameters. - Returns: - List: A list containing the names of the function's parameters. - """ - signature = inspect.signature(func) - parameters = {name: param.default if param.default != param.empty else None - for name, param in signature.parameters.items()} - return list(parameters.keys()) + Args: + func (Callable): The function whose parameters we want to inspect. + Returns: + List: A list containing the names of the function's parameters. + """ + signature = inspect.signature(func) + parameters = {name: param.default if param.default != param.empty else None + for name, param in signature.parameters.items()} + return list(parameters.keys()) -def generate_workflow(load_order: List, filename: str = 'generated_code_workflow.py', queue_size: int = 10) -> str: - """ - Generate the execution code based on the load order. - Args: - load_order (List): A list of tuples representing the load order. - filename (str): The name of the Python file to which the code should be saved. - Defaults to 'generated_code_workflow.py'. - queue_size (int): The number of photos that will be created by the script. + def generate_workflow(self, load_order: List, filename: str = 'generated_code_workflow.py', queue_size: int = 10) -> str: + """ + Generate the execution code based on the load order. - Returns: - str: Generated execution code as a string. - """ + Args: + load_order (List): A list of tuples representing the load order. + filename (str): The name of the Python file to which the code should be saved. + Defaults to 'generated_code_workflow.py'. + queue_size (int): The number of photos that will be created by the script. - # Create the necessary data structures to hold imports and generated code - import_statements, executed_variables, loader_code, code = set(), {}, [], [] - # This dictionary will store the names of the objects that we have already initialized - initialized_objects = {} + Returns: + str: Generated execution code as a string. + """ - # Loop over each dictionary in the load order list - for idx, data, is_loader in load_order: + # Create the necessary data structures to hold imports and generated code + import_statements, executed_variables, loader_code, code = set(['NODE_CLASS_MAPPINGS']), {}, [], [] + # This dictionary will store the names of the objects that we have already initialized + initialized_objects = {} - # Generate class definition and inputs from the data - inputs, class_type = data['inputs'], data['class_type'] - class_def = NODE_CLASS_MAPPINGS[class_type]() + custom_nodes = False + # Loop over each dictionary in the load order list + for idx, data, is_loader in load_order: - # If the class hasn't been initialized yet, initialize it and generate the import statements - if class_type not in initialized_objects: - class_type, import_statement, class_code = get_class_info(class_type) - initialized_objects[class_type] = class_type.lower() - import_statements.add(import_statement) - loader_code.append(class_code) + # Generate class definition and inputs from the data + inputs, class_type = data['inputs'], data['class_type'] + class_def = NODE_CLASS_MAPPINGS[class_type]() - # Get all possible parameters for class_def - class_def_params = get_function_parameters(getattr(class_def, class_def.FUNCTION)) + # If the class hasn't been initialized yet, initialize it and generate the import statements + if class_type not in initialized_objects: + # No need to use preview image nodes since we are executing the script in a terminal + if class_type == 'PreviewImage': + continue - # Remove any keyword arguments from **inputs if they are not in class_def_params - inputs = {key: value for key, value in inputs.items() if key in class_def_params} + class_type, import_statement, class_code = self.get_class_info(class_type) + initialized_objects[class_type] = class_type.lower() + if class_type in self.BASE_NODE_CLASS_MAPPINGS.keys(): + import_statements.add(import_statement) + if class_type not in self.BASE_NODE_CLASS_MAPPINGS.keys(): + custom_nodes = True + loader_code.append(class_code) - # Create executed variable and generate code - executed_variables[idx] = f'{class_type.lower()}_{idx}' - inputs = update_inputs(inputs, executed_variables) + # Get all possible parameters for class_def + class_def_params = self.get_function_parameters(getattr(class_def, class_def.FUNCTION)) - if is_loader: - loader_code.append(create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_loader, **inputs)) - else: - code.append(create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_loader, **inputs)) + # Remove any keyword arguments from **inputs if they are not in class_def_params + inputs = {key: value for key, value in inputs.items() if key in class_def_params} - # Generate final code by combining imports and code, and wrap them in a main function - final_code = assemble_python_code(import_statements, loader_code, code, queue_size) + # Create executed variable and generate code + executed_variables[idx] = f'{class_type.lower()}_{idx}' + inputs = self.update_inputs(inputs, executed_variables) - # Save the code to a .py file - write_code_to_file(filename, final_code) + if is_loader: + loader_code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_loader, **inputs)) + else: + code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_loader, **inputs)) - return final_code + # Generate final code by combining imports and code, and wrap them in a main function + final_code = self.assemble_python_code(import_statements, loader_code, code, queue_size, custom_nodes) + # Save the code to a .py file + write_code_to_file(filename, final_code) -def main(input, queue_size=10): - """ - Main function to be executed. - """ - # Load JSON data from the input file - prompt = read_json_file(input) - load_order = determine_load_order(prompt) - output_file = input.replace('.json', '.py') - code = generate_workflow(load_order, filename=output_file, queue_size=queue_size) - logging.info(code) + return final_code if __name__ == '__main__': - input = 'workflow_api.json' + input = 'workflow_api_serge.json' queue_size = 10 - main(input, queue_size) + ComfyUItoPython(input=input, queue_size=queue_size) diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..61f8d86 --- /dev/null +++ b/utils.py @@ -0,0 +1,150 @@ +import asyncio +import json +import glob +import os +from typing import Sequence, Mapping, Any, Union +import sys + +sys.path.append('../') + +import execution +from nodes import init_custom_nodes + + +def read_json_file(file_path: str) -> dict: + """ + Reads a JSON file and returns its contents as a dictionary. + + Args: + file_path (str): The path to the JSON file. + + Returns: + dict: The contents of the JSON file as a dictionary. + + Raises: + FileNotFoundError: If the file is not found, it lists all JSON files in the directory of the file path. + ValueError: If the file is not a valid JSON. + """ + + try: + with open(file_path, 'r') as file: + data = json.load(file) + return data + + except FileNotFoundError: + # Get the directory from the file_path + directory = os.path.dirname(file_path) + + # If the directory is an empty string (which means file is in the current directory), + # get the current working directory + if not directory: + directory = os.getcwd() + + # Find all JSON files in the directory + json_files = glob.glob(f"{directory}/*.json") + + # Format the list of JSON files as a string + json_files_str = "\n".join(json_files) + + raise FileNotFoundError(f"\n\nFile not found: {file_path}. JSON files in the directory:\n{json_files_str}") + + except json.JSONDecodeError: + raise ValueError(f"Invalid JSON format in file: {file_path}") + + +def write_code_to_file(filename: str, code: str) -> None: + """ + Writes given code to a .py file. If the directory does not exist, it creates it. + + Args: + filename (str): The name of the Python file to save the code to. + code (str): The code to save. + """ + + # Extract directory from the filename + directory = os.path.dirname(filename) + + # If the directory does not exist, create it + if directory and not os.path.exists(directory): + os.makedirs(directory) + + # Save the code to a .py file + with open(filename, 'w') as file: + file.write(code) + + +def import_custom_nodes() -> None: + """Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS + + This function sets up a new asyncio event loop, initializes the PromptServer, + creates a PromptQueue, and initializes the custom nodes. + """ + import server + + # Creating a new event loop and setting it as the default loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Creating an instance of PromptServer with the loop + server_instance = server.PromptServer(loop) + execution.PromptQueue(server_instance) + + # Initializing custom nodes + init_custom_nodes() + + +def add_comfyui_directories_to_sys_path() -> None: + """ + Recursively looks at parent folders starting from the current working directory until it finds 'ComfyUI' and 'ComfyUI-to-Python-Extension'. + Once found, the directories are added to sys.path. + """ + start_path = os.getcwd() # Get the current working directory + + def search_directory(path: str) -> None: + # Check if the current directory contains 'ComfyUI' or 'ComfyUI-to-Python-Extension' + for directory_name in ['ComfyUI', 'ComfyUI-to-Python-Extension']: + if directory_name in os.listdir(path): + directory_path = os.path.join(path, directory_name) + sys.path.append(directory_path) + print(f"'{directory_name}' found and added to sys.path: {directory_path}") + + # Get the parent directory + parent_directory = os.path.dirname(path) + + # If the parent directory is the same as the current directory, we've reached the root and stop the search + if parent_directory == path: + return + + # Recursively call the function with the parent directory + search_directory(parent_directory) + + # Start the search from the current working directory + search_directory(start_path) + +# Example usage +add_comfyui_directories_to_sys_path() + + + +def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: + """Returns the value at the given index of a sequence or mapping. + + If the object is a sequence (like list or string), returns the value at the given index. + If the object is a mapping (like a dictionary), returns the value at the index-th key. + + Some return a dictionary, in these cases, we look for the "results" key + + Args: + obj (Union[Sequence, Mapping]): The object to retrieve the value from. + index (int): The index of the value to retrieve. + + Returns: + Any: The value at the given index. + + Raises: + IndexError: If the index is out of bounds for the object and the object is not a mapping. + """ + try: + return obj[index] + except KeyError: + return obj['result'][index] From e46283db70ce0b34ee6397086d9f7585b4c5c307 Mon Sep 17 00:00:00 2001 From: Peyton Date: Sat, 19 Aug 2023 18:07:45 -0500 Subject: [PATCH 02/10] Major refactor to break up code into smaller, task based classes. --- comfyui_to_python.py | 496 +++++++++++++++++++++++++++---------------- utils.py | 66 ------ 2 files changed, 311 insertions(+), 251 deletions(-) diff --git a/comfyui_to_python.py b/comfyui_to_python.py index cb99127..f1c1b4f 100644 --- a/comfyui_to_python.py +++ b/comfyui_to_python.py @@ -1,14 +1,16 @@ import copy +import glob import inspect +import json import logging import os import sys import black -from typing import Dict, List, Any, Callable +from typing import Dict, List, Any, Callable, Tuple -from utils import import_custom_nodes, read_json_file, write_code_to_file, add_comfyui_directories_to_sys_path +from utils import import_custom_nodes, add_comfyui_directories_to_sys_path sys.path.append('../') @@ -18,185 +20,287 @@ logging.basicConfig(level=logging.INFO) - -class ComfyUItoPython: - def __init__(self, input='workflow_api.json', queue_size=10): - self.input = input - self.queue_size = queue_size - self.BASE_NODE_CLASS_MAPPINGS = copy.deepcopy(NODE_CLASS_MAPPINGS) - self.generate_code() +class FileHandler: + """Handles reading and writing files. - def generate_code(self): - """ - Main function to be executed. - """ - import_custom_nodes() - # Load JSON data from the input file - prompt = read_json_file(self.input) - load_order = self.determine_load_order(prompt) - output_file = self.input.replace('.json', '.py') - code = self.generate_workflow(load_order, filename=output_file, queue_size=self.queue_size) - logging.info(code) - - def determine_load_order(self, data: Dict) -> List: + This class provides methods to read JSON data from an input file and write code to an output file. + """ + + @staticmethod + def read_json_file(file_path: str) -> dict: """ - Determine the load order of each key in the provided dictionary. This code will place the - nodes without node dependencies first, then ensure that any node whose result is used - in another node will be added to the list in the order it should be executed. + Reads a JSON file and returns its contents as a dictionary. Args: - data (Dict): - The dictionary for which to determine the load order. + file_path (str): The path to the JSON file. Returns: - List: - A list of tuples where each tuple contains a key, its corresponding dictionary, - and a boolean indicating whether or not the function is dependent on the output of - a previous function, ordered by load order. + dict: The contents of the JSON file as a dictionary. + + Raises: + FileNotFoundError: If the file is not found, it lists all JSON files in the directory of the file path. + ValueError: If the file is not a valid JSON. """ - # Create a dictionary to keep track of visited nodes. - visited = {} - # Create a list to store the load order for functions - load_order = [] - # Boolean to indicate whether or not the class is a loader class that should not be - # reloaded during every loop - is_loader = False - - def dfs(key: str) -> None: - """ - Depth-First Search function. - - Args: - key (str): The key from which to start the DFS. - - Returns: - None - """ - # Mark the node as visited. - visited[key] = True - inputs = data[key]['inputs'] - - # Loop over each input key. - for input_key, val in inputs.items(): - # If the value is a list and the first item in the list (which should be a key) - # has not been visited yet, then recursively apply dfs on the dependency. - if isinstance(val, list) and val[0] not in visited: - dfs(val[0]) - - # Add the key and its corresponding data to the load order list. - load_order.append((key, data[key], is_loader)) - - # Load Loader keys first - for key in data: - class_def = NODE_CLASS_MAPPINGS[data[key]['class_type']]() - if class_def.CATEGORY == 'loaders' or class_def.FUNCTION in ['encode'] or not any(isinstance(val, list) for val in data[key]['inputs'].values()): - is_loader = True - # If the key has not been visited, perform a DFS from that key. - if key not in visited: - dfs(key) + try: + with open(file_path, 'r') as file: + data = json.load(file) + return data - # Reset is_loader bool - is_loader = False - # Loop over each key in the data. - for key in data: - # If the key has not been visited, perform a DFS from that key. - if key not in visited: - dfs(key) + except FileNotFoundError: + # Get the directory from the file_path + directory = os.path.dirname(file_path) - return load_order + # If the directory is an empty string (which means file is in the current directory), + # get the current working directory + if not directory: + directory = os.getcwd() + # Find all JSON files in the directory + json_files = glob.glob(f"{directory}/*.json") - def create_function_call_code(self, obj_name: str, func: str, variable_name: str, is_loader: bool, **kwargs) -> str: - """ - This function generates Python code for a function call. + # Format the list of JSON files as a string + json_files_str = "\n".join(json_files) + + raise FileNotFoundError(f"\n\nFile not found: {file_path}. JSON files in the directory:\n{json_files_str}") + + except json.JSONDecodeError: + raise ValueError(f"Invalid JSON format in file: {file_path}") + + @staticmethod + def write_code_to_file(file_path: str, code: str) -> None: + """Write the specified code to a Python file. Args: - obj_name (str): The name of the initialized object. - func (str): The function to be called. - variable_name (str): The name of the variable that the function result should be assigned to. - is_loader (bool): Determines the code indentation. - **kwargs: The keyword arguments for the function. + file_path (str): The path to the Python file. + code (str): The code to write to the file. Returns: - str: The generated Python code. + None """ + # Extract directory from the filename + directory = os.path.dirname(file_path) - def format_arg(key: str, value: any) -> str: - """Formats arguments based on key and value.""" - if key == 'noise_seed' or key == 'seed': - return f'{key}=random.randint(1, 2**64)' - elif isinstance(value, str): - value = value.replace("\n", "\\n").replace('"', "'") - return f'{key}="{value}"' - elif isinstance(value, dict) and 'variable_name' in value: - return f'{key}={value["variable_name"]}' - return f'{key}={value}' + # If the directory does not exist, create it + if directory and not os.path.exists(directory): + os.makedirs(directory) - args = ', '.join(format_arg(key, value) for key, value in kwargs.items()) + # Save the code to a .py file + with open(file_path, 'w') as file: + file.write(code) - # Generate the Python code - code = f'{variable_name} = {obj_name}.{func}({args})\n' - # If the code contains dependencies, indent the code because it will be placed inside - # of a for loop - if not is_loader: - code = f'\t{code}' +class LoadOrderDeterminer: + """Determine the load order of each key in the provided dictionary. - return code + This class places the nodes without node dependencies first, then ensures that any node whose + result is used in another node will be added to the list in the order it should be executed. + Attributes: + data (Dict): The dictionary for which to determine the load order. + node_class_mappings (Dict): Mappings of node classes. + """ - def update_inputs(self, inputs: Dict, executed_variables: Dict) -> Dict: + def __init__(self, data: Dict, node_class_mappings: Dict): + """Initialize the LoadOrderDeterminer with the given data and node class mappings. + + Args: + data (Dict): The dictionary for which to determine the load order. + node_class_mappings (Dict): Mappings of node classes. """ - Update inputs based on the executed variables. + self.data = data + self.node_class_mappings = node_class_mappings + self.visited = {} + self.load_order = [] + self.is_special_function = False + + def determine_load_order(self) -> List[Tuple[str, Dict, bool]]: + """Determine the load order for the given data. + + Returns: + List[Tuple[str, Dict, bool]]: A list of tuples representing the load order. + """ + self._load_special_functions_first() + self.is_special_function = False + for key in self.data: + if key not in self.visited: + self._dfs(key) + return self.load_order + + def _dfs(self, key: str) -> None: + """Depth-First Search function to determine the load order. Args: - inputs (Dict): Inputs dictionary to update. - executed_variables (Dict): Dictionary storing executed variable names. + key (str): The key from which to start the DFS. Returns: - Dict: Updated inputs dictionary. + None """ - for key in inputs.keys(): - if isinstance(inputs[key], list) and inputs[key][0] in executed_variables.keys(): - inputs[key] = {'variable_name': f"get_value_at_index({executed_variables[inputs[key][0]]}, {inputs[key][1]})"} - return inputs + # Mark the node as visited. + self.visited[key] = True + inputs = self.data[key]['inputs'] + # Loop over each input key. + for input_key, val in inputs.items(): + # If the value is a list and the first item in the list has not been visited yet, + # then recursively apply DFS on the dependency. + if isinstance(val, list) and val[0] not in self.visited: + self._dfs(val[0]) + # Add the key and its corresponding data to the load order list. + self.load_order.append((key, self.data[key], self.is_special_function)) + + def _load_special_functions_first(self) -> None: + """Load functions without dependencies, loaderes, and encoders first. + + Returns: + None + """ + # Iterate over each key in the data to check for loader keys. + for key in self.data: + class_def = self.node_class_mappings[self.data[key]['class_type']]() + # Check if the class is a loader class or meets specific conditions. + if (class_def.CATEGORY == 'loaders' or + class_def.FUNCTION in ['encode'] or + not any(isinstance(val, list) for val in self.data[key]['inputs'].values())): + self.is_special_function = True + # If the key has not been visited, perform a DFS from that key. + if key not in self.visited: + self._dfs(key) + + +class CodeGenerator: + """Generates Python code for a workflow based on the load order. + Attributes: + node_class_mappings (Dict): Mappings of node classes. + base_node_class_mappings (Dict): Base mappings of node classes. + """ - def get_class_info(self, class_type: str) -> (str, str, str): + def __init__(self, node_class_mappings: Dict, base_node_class_mappings: Dict): + """Initialize the CodeGenerator with given node class mappings. + + Args: + node_class_mappings (Dict): Mappings of node classes. + base_node_class_mappings (Dict): Base mappings of node classes. """ - Generates and returns necessary information about class type. + self.node_class_mappings = node_class_mappings + self.base_node_class_mappings = base_node_class_mappings + + def generate_workflow(self, load_order: List, filename: str = 'generated_code_workflow.py', queue_size: int = 10) -> str: + """Generate the execution code based on the load order. Args: - class_type (str): Class type + load_order (List): A list of tuples representing the load order. + filename (str): The name of the Python file to which the code should be saved. + Defaults to 'generated_code_workflow.py'. + queue_size (int): The number of photos that will be created by the script. Returns: - class_type (str): Updated class type - import_statement (str): Import statement string - class_code (str): Class initialization code + str: Generated execution code as a string. """ - import_statement = class_type - if class_type in self.BASE_NODE_CLASS_MAPPINGS.keys(): - class_code = f'{class_type.lower()} = {class_type}()' - else: - class_code = f'{class_type.lower()} = NODE_CLASS_MAPPINGS["{class_type}"]()' + # Create the necessary data structures to hold imports and generated code + import_statements, executed_variables, special_functions_code, code = set(['NODE_CLASS_MAPPINGS']), {}, [], [] + # This dictionary will store the names of the objects that we have already initialized + initialized_objects = {} - return class_type, import_statement, class_code + custom_nodes = False + # Loop over each dictionary in the load order list + for idx, data, is_special_function in load_order: + # Generate class definition and inputs from the data + inputs, class_type = data['inputs'], data['class_type'] + class_def = self.node_class_mappings[class_type]() - def assemble_python_code(self, import_statements: set, loader_code: List[str], code: List[str], queue_size: int, custom_nodes=False) -> str: + # If the class hasn't been initialized yet, initialize it and generate the import statements + if class_type not in initialized_objects: + # No need to use preview image nodes since we are executing the script in a terminal + if class_type == 'PreviewImage': + continue + + class_type, import_statement, class_code = self.get_class_info(class_type) + initialized_objects[class_type] = class_type.lower() + if class_type in self.base_node_class_mappings.keys(): + import_statements.add(import_statement) + if class_type not in self.base_node_class_mappings.keys(): + custom_nodes = True + special_functions_code.append(class_code) + + # Get all possible parameters for class_def + class_def_params = self.get_function_parameters(getattr(class_def, class_def.FUNCTION)) + + # Remove any keyword arguments from **inputs if they are not in class_def_params + inputs = {key: value for key, value in inputs.items() if key in class_def_params} + + # Create executed variable and generate code + executed_variables[idx] = f'{class_type.lower()}_{idx}' + inputs = self.update_inputs(inputs, executed_variables) + + if is_special_function: + special_functions_code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_special_function, **inputs)) + else: + code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_special_function, **inputs)) + + # Generate final code by combining imports and code, and wrap them in a main function + final_code = self.assemble_python_code(import_statements, special_functions_code, code, queue_size, custom_nodes) + + return final_code + + def create_function_call_code(self, obj_name: str, func: str, variable_name: str, is_special_function: bool, **kwargs) -> str: + """Generate Python code for a function call. + + Args: + obj_name (str): The name of the initialized object. + func (str): The function to be called. + variable_name (str): The name of the variable that the function result should be assigned to. + is_special_function (bool): Determines the code indentation. + **kwargs: The keyword arguments for the function. + + Returns: + str: The generated Python code. """ - Generates final code string. + args = ', '.join(self.format_arg(key, value) for key, value in kwargs.items()) + + # Generate the Python code + code = f'{variable_name} = {obj_name}.{func}({args})\n' + + # If the code contains dependencies and is not a loader or encoder, indent the code because it will be placed inside + # of a for loop + if not is_special_function: + code = f'\t{code}' + + return code + + def format_arg(self, key: str, value: any) -> str: + """Formats arguments based on key and value. Args: - import_statements (set): A set of unique import statements - code (List[str]): A list of code strings + key (str): Argument key. + value (any): Argument value. + + Returns: + str: Formatted argument as a string. + """ + if key == 'noise_seed' or key == 'seed': + return f'{key}=random.randint(1, 2**64)' + elif isinstance(value, str): + value = value.replace("\n", "\\n").replace('"', "'") + return f'{key}="{value}"' + elif isinstance(value, dict) and 'variable_name' in value: + return f'{key}={value["variable_name"]}' + return f'{key}={value}' + + def assemble_python_code(self, import_statements: set, speical_functions_code: List[str], code: List[str], queue_size: int, custom_nodes=False) -> str: + """Generates the final code string. + + Args: + import_statements (set): A set of unique import statements. + speical_functions_code (List[str]): A list of special functions code strings. + code (List[str]): A list of code strings. queue_size (int): Number of photos that will be generated by the script. custom_nodes (bool): Whether to include custom nodes in the code. Returns: - final_code (str): Generated final code as a string + str: Generated final code as a string. """ # Get the source code of the function as a string add_comfyui_directories_to_sys_path_code = inspect.getsource(add_comfyui_directories_to_sys_path) @@ -212,7 +316,7 @@ def assemble_python_code(self, import_statements: set, loader_code: List[str], c # Create import statements for node classes imports_code = [f"from nodes import {', '.join([class_name for class_name in import_statements])}" ] # Assemble the main function code, including custom nodes if applicable - main_function_code = "def main():\n\t" + f'{custom_nodes}with torch.inference_mode():\n\t\t' + '\n\t\t'.join(loader_code) \ + main_function_code = "def main():\n\t" + f'{custom_nodes}with torch.inference_mode():\n\t\t' + '\n\t\t'.join(speical_functions_code) \ + f'\n\n\t\tfor q in range({queue_size}):\n\t\t' + '\n\t\t'.join(code) # Concatenate all parts to form the final code final_code = '\n'.join(static_imports + imports_code + ['', main_function_code, '', 'if __name__ == "__main__":', '\tmain()']) @@ -221,10 +325,25 @@ def assemble_python_code(self, import_statements: set, loader_code: List[str], c return final_code + def get_class_info(self, class_type: str) -> Tuple[str, str, str]: + """Generates and returns necessary information about class type. + + Args: + class_type (str): Class type. + + Returns: + Tuple[str, str, str]: Updated class type, import statement string, class initialization code. + """ + import_statement = class_type + if class_type in self.base_node_class_mappings.keys(): + class_code = f'{class_type.lower()} = {class_type}()' + else: + class_code = f'{class_type.lower()} = NODE_CLASS_MAPPINGS["{class_type}"]()' + return class_type, import_statement, class_code def get_function_parameters(self, func: Callable) -> List: - """Get the names of a function's parameters. + """Get the names of a function's parameters. Args: func (Callable): The function whose parameters we want to inspect. @@ -235,75 +354,82 @@ def get_function_parameters(self, func: Callable) -> List: signature = inspect.signature(func) parameters = {name: param.default if param.default != param.empty else None for name, param in signature.parameters.items()} - return list(parameters.keys()) - + return list(parameters.keys()) - def generate_workflow(self, load_order: List, filename: str = 'generated_code_workflow.py', queue_size: int = 10) -> str: - """ - Generate the execution code based on the load order. + def update_inputs(self, inputs: Dict, executed_variables: Dict) -> Dict: + """Update inputs based on the executed variables. Args: - load_order (List): A list of tuples representing the load order. - filename (str): The name of the Python file to which the code should be saved. - Defaults to 'generated_code_workflow.py'. - queue_size (int): The number of photos that will be created by the script. + inputs (Dict): Inputs dictionary to update. + executed_variables (Dict): Dictionary storing executed variable names. Returns: - str: Generated execution code as a string. + Dict: Updated inputs dictionary. """ + for key in inputs.keys(): + if isinstance(inputs[key], list) and inputs[key][0] in executed_variables.keys(): + inputs[key] = {'variable_name': f"get_value_at_index({executed_variables[inputs[key][0]]}, {inputs[key][1]})"} + return inputs + - # Create the necessary data structures to hold imports and generated code - import_statements, executed_variables, loader_code, code = set(['NODE_CLASS_MAPPINGS']), {}, [], [] - # This dictionary will store the names of the objects that we have already initialized - initialized_objects = {} - - custom_nodes = False - # Loop over each dictionary in the load order list - for idx, data, is_loader in load_order: +class ComfyUItoPython: + """Main workflow to generate Python code from a workflow_api.json file. - # Generate class definition and inputs from the data - inputs, class_type = data['inputs'], data['class_type'] - class_def = NODE_CLASS_MAPPINGS[class_type]() + Attributes: + input_file (str): Path to the input JSON file. + output_file (str): Path to the output Python file. + queue_size (int): The number of photos that will be created by the script. + node_class_mappings (Dict): Mappings of node classes. + base_node_class_mappings (Dict): Base mappings of node classes. + """ - # If the class hasn't been initialized yet, initialize it and generate the import statements - if class_type not in initialized_objects: - # No need to use preview image nodes since we are executing the script in a terminal - if class_type == 'PreviewImage': - continue + def __init__(self, input_file: str, output_file: str, queue_size: int = 10, node_class_mappings: Dict = NODE_CLASS_MAPPINGS): + """Initialize the ComfyUItoPython class with the given parameters. - class_type, import_statement, class_code = self.get_class_info(class_type) - initialized_objects[class_type] = class_type.lower() - if class_type in self.BASE_NODE_CLASS_MAPPINGS.keys(): - import_statements.add(import_statement) - if class_type not in self.BASE_NODE_CLASS_MAPPINGS.keys(): - custom_nodes = True - loader_code.append(class_code) + Args: + input_file (str): Path to the input JSON file. + output_file (str): Path to the output Python file. + queue_size (int): The number of times a workflow will be executed by the script. Defaults to 10. + node_class_mappings (Dict): Mappings of node classes. Defaults to NODE_CLASS_MAPPINGS. + """ + self.input_file = input_file + self.output_file = output_file + self.queue_size = queue_size + self.node_class_mappings = node_class_mappings + self.base_node_class_mappings = copy.deepcopy(self.node_class_mappings) + self.execute() - # Get all possible parameters for class_def - class_def_params = self.get_function_parameters(getattr(class_def, class_def.FUNCTION)) + def execute(self): + """Execute the main workflow to generate Python code. - # Remove any keyword arguments from **inputs if they are not in class_def_params - inputs = {key: value for key, value in inputs.items() if key in class_def_params} + Returns: + None + """ + # Step 1: Import all custom nodes + import_custom_nodes() - # Create executed variable and generate code - executed_variables[idx] = f'{class_type.lower()}_{idx}' - inputs = self.update_inputs(inputs, executed_variables) + # Step 2: Read JSON data from the input file + data = FileHandler.read_json_file(self.input_file) - if is_loader: - loader_code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_loader, **inputs)) - else: - code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_loader, **inputs)) + # Step 3: Determine the load order + load_order_determiner = LoadOrderDeterminer(data, self.node_class_mappings) + load_order = load_order_determiner.determine_load_order() - # Generate final code by combining imports and code, and wrap them in a main function - final_code = self.assemble_python_code(import_statements, loader_code, code, queue_size, custom_nodes) + # Step 4: Generate the workflow code + code_generator = CodeGenerator(self.node_class_mappings, self.base_node_class_mappings) + generated_code = code_generator.generate_workflow(load_order, filename=self.output_file, queue_size=self.queue_size) - # Save the code to a .py file - write_code_to_file(filename, final_code) + # Step 5: Write the generated code to a file + FileHandler.write_code_to_file(self.output_file, generated_code) - return final_code + print(f"Code successfully generated and written to {self.output_file}") if __name__ == '__main__': - input = 'workflow_api_serge.json' + # Update class parameters here + input_file = 'workflow_api_serge.json' + output_file = 'workflow_api_serge.py' queue_size = 10 - ComfyUItoPython(input=input, queue_size=queue_size) + + # Convert ComfyUI workflow to Python + ComfyUItoPython(input_file=input_file, output_file=output_file, queue_size=queue_size) diff --git a/utils.py b/utils.py index 61f8d86..5a1820a 100644 --- a/utils.py +++ b/utils.py @@ -9,68 +9,6 @@ import execution from nodes import init_custom_nodes - - -def read_json_file(file_path: str) -> dict: - """ - Reads a JSON file and returns its contents as a dictionary. - - Args: - file_path (str): The path to the JSON file. - - Returns: - dict: The contents of the JSON file as a dictionary. - - Raises: - FileNotFoundError: If the file is not found, it lists all JSON files in the directory of the file path. - ValueError: If the file is not a valid JSON. - """ - - try: - with open(file_path, 'r') as file: - data = json.load(file) - return data - - except FileNotFoundError: - # Get the directory from the file_path - directory = os.path.dirname(file_path) - - # If the directory is an empty string (which means file is in the current directory), - # get the current working directory - if not directory: - directory = os.getcwd() - - # Find all JSON files in the directory - json_files = glob.glob(f"{directory}/*.json") - - # Format the list of JSON files as a string - json_files_str = "\n".join(json_files) - - raise FileNotFoundError(f"\n\nFile not found: {file_path}. JSON files in the directory:\n{json_files_str}") - - except json.JSONDecodeError: - raise ValueError(f"Invalid JSON format in file: {file_path}") - - -def write_code_to_file(filename: str, code: str) -> None: - """ - Writes given code to a .py file. If the directory does not exist, it creates it. - - Args: - filename (str): The name of the Python file to save the code to. - code (str): The code to save. - """ - - # Extract directory from the filename - directory = os.path.dirname(filename) - - # If the directory does not exist, create it - if directory and not os.path.exists(directory): - os.makedirs(directory) - - # Save the code to a .py file - with open(filename, 'w') as file: - file.write(code) def import_custom_nodes() -> None: @@ -121,10 +59,6 @@ def search_directory(path: str) -> None: # Start the search from the current working directory search_directory(start_path) -# Example usage -add_comfyui_directories_to_sys_path() - - def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: """Returns the value at the given index of a sequence or mapping. From a26219b63926b98f49b65c3aff0c9b7c4b02cc32 Mon Sep 17 00:00:00 2001 From: Peyton Date: Sat, 19 Aug 2023 19:41:29 -0500 Subject: [PATCH 03/10] Hotfix for node class names with trailing white spaces. Removed the need to import functions from the utils file when running the generated code. --- comfyui_to_python.py | 22 ++++++++++++---------- utils.py | 27 +++++++++++---------------- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/comfyui_to_python.py b/comfyui_to_python.py index f1c1b4f..4f7f48c 100644 --- a/comfyui_to_python.py +++ b/comfyui_to_python.py @@ -10,7 +10,7 @@ from typing import Dict, List, Any, Callable, Tuple -from utils import import_custom_nodes, add_comfyui_directories_to_sys_path +from utils import import_custom_nodes, add_comfyui_directory_to_sys_path, get_value_at_index sys.path.append('../') @@ -218,7 +218,7 @@ def generate_workflow(self, load_order: List, filename: str = 'generated_code_wo continue class_type, import_statement, class_code = self.get_class_info(class_type) - initialized_objects[class_type] = class_type.lower() + initialized_objects[class_type] = class_type.lower().strip() if class_type in self.base_node_class_mappings.keys(): import_statements.add(import_statement) if class_type not in self.base_node_class_mappings.keys(): @@ -232,7 +232,7 @@ def generate_workflow(self, load_order: List, filename: str = 'generated_code_wo inputs = {key: value for key, value in inputs.items() if key in class_def_params} # Create executed variable and generate code - executed_variables[idx] = f'{class_type.lower()}_{idx}' + executed_variables[idx] = f'{class_type.lower().strip()}_{idx}' inputs = self.update_inputs(inputs, executed_variables) if is_special_function: @@ -302,14 +302,16 @@ def assemble_python_code(self, import_statements: set, speical_functions_code: L Returns: str: Generated final code as a string. """ - # Get the source code of the function as a string - add_comfyui_directories_to_sys_path_code = inspect.getsource(add_comfyui_directories_to_sys_path) + # Get the source code of the utils functions as a string + func_strings = [] + for func in [add_comfyui_directory_to_sys_path, get_value_at_index]: + func_strings.append(f'\n{inspect.getsource(func)}') # Define static import statements required for the script - static_imports = ['import os', 'import random', 'import sys', 'import torch', f'\n{add_comfyui_directories_to_sys_path_code}', - '\n\nadd_comfyui_directories_to_sys_path()'] + static_imports = ['import os', 'import random', 'import sys', 'from typing import Sequence, Mapping, Any, Union', + 'import torch'] + func_strings + ['\n\nadd_comfyui_directory_to_sys_path()'] # Check if custom nodes should be included if custom_nodes: - static_imports.append('\nfrom utils import import_custom_nodes, get_value_at_index\n') + static_imports.append(f'\n{inspect.getsource(import_custom_nodes)}\n') custom_nodes = 'import_custom_nodes()\n\t' else: custom_nodes = '' @@ -336,9 +338,9 @@ def get_class_info(self, class_type: str) -> Tuple[str, str, str]: """ import_statement = class_type if class_type in self.base_node_class_mappings.keys(): - class_code = f'{class_type.lower()} = {class_type}()' + class_code = f'{class_type.lower().strip()} = {class_type.strip()}()' else: - class_code = f'{class_type.lower()} = NODE_CLASS_MAPPINGS["{class_type}"]()' + class_code = f'{class_type.lower().strip()} = NODE_CLASS_MAPPINGS["{class_type}"]()' return class_type, import_statement, class_code diff --git a/utils.py b/utils.py index 5a1820a..eaaa4db 100644 --- a/utils.py +++ b/utils.py @@ -1,15 +1,9 @@ -import asyncio -import json -import glob import os from typing import Sequence, Mapping, Any, Union import sys sys.path.append('../') -import execution -from nodes import init_custom_nodes - def import_custom_nodes() -> None: """Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS @@ -17,6 +11,9 @@ def import_custom_nodes() -> None: This function sets up a new asyncio event loop, initializes the PromptServer, creates a PromptQueue, and initializes the custom nodes. """ + import asyncio + import execution + from nodes import init_custom_nodes import server # Creating a new event loop and setting it as the default loop @@ -30,21 +27,19 @@ def import_custom_nodes() -> None: # Initializing custom nodes init_custom_nodes() - -def add_comfyui_directories_to_sys_path() -> None: +def add_comfyui_directory_to_sys_path() -> None: """ - Recursively looks at parent folders starting from the current working directory until it finds 'ComfyUI' and 'ComfyUI-to-Python-Extension'. - Once found, the directories are added to sys.path. + Recursively looks at parent folders starting from the current working directory until it finds 'ComfyUI'. + Once found, the directory is added to sys.path. """ start_path = os.getcwd() # Get the current working directory def search_directory(path: str) -> None: - # Check if the current directory contains 'ComfyUI' or 'ComfyUI-to-Python-Extension' - for directory_name in ['ComfyUI', 'ComfyUI-to-Python-Extension']: - if directory_name in os.listdir(path): - directory_path = os.path.join(path, directory_name) - sys.path.append(directory_path) - print(f"'{directory_name}' found and added to sys.path: {directory_path}") + # Check if the current directory contains 'ComfyUI' + if 'ComfyUI' in os.listdir(path): + directory_path = os.path.join(path, 'ComfyUI') + sys.path.append(directory_path) + print(f"ComfyUI found and added to sys.path: {directory_path}") # Get the parent directory parent_directory = os.path.dirname(path) From 95ed7d54262728ebad169646317dc0fd43142b44 Mon Sep 17 00:00:00 2001 From: Peyton Date: Sat, 19 Aug 2023 20:48:23 -0500 Subject: [PATCH 04/10] Added logic to deal with hidden variables. --- comfyui_to_python.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/comfyui_to_python.py b/comfyui_to_python.py index 4f7f48c..0e27353 100644 --- a/comfyui_to_python.py +++ b/comfyui_to_python.py @@ -4,6 +4,7 @@ import json import logging import os +import random import sys import black @@ -230,6 +231,9 @@ def generate_workflow(self, load_order: List, filename: str = 'generated_code_wo # Remove any keyword arguments from **inputs if they are not in class_def_params inputs = {key: value for key, value in inputs.items() if key in class_def_params} + # Deal with hidden variables + if 'unique_id' in class_def_params: + inputs['unique_id'] = random.randint(1, 2**64) # Create executed variable and generate code executed_variables[idx] = f'{class_type.lower().strip()}_{idx}' @@ -429,8 +433,8 @@ def execute(self): if __name__ == '__main__': # Update class parameters here - input_file = 'workflow_api_serge.json' - output_file = 'workflow_api_serge.py' + input_file = 'workflow_api_impact3.json' + output_file = 'workflow_api_impact3.py' queue_size = 10 # Convert ComfyUI workflow to Python From 203a675d6bf711c6a709f1ddd8f68c09a7f65a54 Mon Sep 17 00:00:00 2001 From: Peyton Date: Sat, 19 Aug 2023 20:49:16 -0500 Subject: [PATCH 05/10] Changed default parameters back. --- comfyui_to_python.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/comfyui_to_python.py b/comfyui_to_python.py index 0e27353..fa38ee1 100644 --- a/comfyui_to_python.py +++ b/comfyui_to_python.py @@ -433,8 +433,8 @@ def execute(self): if __name__ == '__main__': # Update class parameters here - input_file = 'workflow_api_impact3.json' - output_file = 'workflow_api_impact3.py' + input_file = 'workflow_api.json' + output_file = 'workflow_api.py' queue_size = 10 # Convert ComfyUI workflow to Python From c9a02f6eff560f5a291b6025621a51dc5353112a Mon Sep 17 00:00:00 2001 From: Peyton DeNiro Date: Sat, 19 Aug 2023 20:50:50 -0500 Subject: [PATCH 06/10] Update README.md --- README.md | 104 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 81 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index f2930a8..4032123 100644 --- a/README.md +++ b/README.md @@ -10,18 +10,74 @@ The `ComfyUI-to-Python-Extension` is a powerful tool that translates ComfyUI wor **To this:** ``` +import os import random -import torch import sys +from typing import Sequence, Mapping, Any, Union +import torch + + +def add_comfyui_directory_to_sys_path() -> None: + """ + Recursively looks at parent folders starting from the current working directory until it finds 'ComfyUI'. + Once found, the directory is added to sys.path. + """ + start_path = os.getcwd() # Get the current working directory + + def search_directory(path: str) -> None: + # Check if the current directory contains 'ComfyUI' + if "ComfyUI" in os.listdir(path): + directory_path = os.path.join(path, "ComfyUI") + sys.path.append(directory_path) + print(f"ComfyUI found and added to sys.path: {directory_path}") + + # Get the parent directory + parent_directory = os.path.dirname(path) + + # If the parent directory is the same as the current directory, we've reached the root and stop the search + if parent_directory == path: + return + + # Recursively call the function with the parent directory + search_directory(parent_directory) + + # Start the search from the current working directory + search_directory(start_path) + + +def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: + """Returns the value at the given index of a sequence or mapping. + + If the object is a sequence (like list or string), returns the value at the given index. + If the object is a mapping (like a dictionary), returns the value at the index-th key. + + Some return a dictionary, in these cases, we look for the "results" key -sys.path.append("../") + Args: + obj (Union[Sequence, Mapping]): The object to retrieve the value from. + index (int): The index of the value to retrieve. + + Returns: + Any: The value at the given index. + + Raises: + IndexError: If the index is out of bounds for the object and the object is not a mapping. + """ + try: + return obj[index] + except KeyError: + return obj["result"][index] + + +add_comfyui_directory_to_sys_path() from nodes import ( - VAEDecode, + CLIPTextEncode, KSamplerAdvanced, - EmptyLatentImage, - SaveImage, CheckpointLoaderSimple, - CLIPTextEncode, + VAEDecode, + SaveImage, + EmptyLatentImage, + NODE_CLASS_MAPPINGS, ) @@ -39,12 +95,12 @@ def main(): cliptextencode = CLIPTextEncode() cliptextencode_6 = cliptextencode.encode( - text="evening sunset scenery blue sky nature, glass bottle with a galaxy in it", - clip=checkpointloadersimple_4[1], + text="Kylo Ren trapped inside of a Mark Rothko painting", + clip=get_value_at_index(checkpointloadersimple_4, 1), ) cliptextencode_7 = cliptextencode.encode( - text="text, watermark", clip=checkpointloadersimple_4[1] + text="text, watermark", clip=get_value_at_index(checkpointloadersimple_4, 1) ) checkpointloadersimple_12 = checkpointloadersimple.load_checkpoint( @@ -52,12 +108,13 @@ def main(): ) cliptextencode_15 = cliptextencode.encode( - text="evening sunset scenery blue sky nature, glass bottle with a galaxy in it", - clip=checkpointloadersimple_12[1], + text="Kylo Ren trapped inside of a Mark Rothko painting", + clip=get_value_at_index(checkpointloadersimple_12, 1), ) cliptextencode_16 = cliptextencode.encode( - text="text, watermark", clip=checkpointloadersimple_12[1] + text="text, watermark", + clip=get_value_at_index(checkpointloadersimple_12, 1), ) ksampleradvanced = KSamplerAdvanced() @@ -75,10 +132,10 @@ def main(): start_at_step=0, end_at_step=20, return_with_leftover_noise="enable", - model=checkpointloadersimple_4[0], - positive=cliptextencode_6[0], - negative=cliptextencode_7[0], - latent_image=emptylatentimage_5[0], + model=get_value_at_index(checkpointloadersimple_4, 0), + positive=get_value_at_index(cliptextencode_6, 0), + negative=get_value_at_index(cliptextencode_7, 0), + latent_image=get_value_at_index(emptylatentimage_5, 0), ) ksampleradvanced_11 = ksampleradvanced.sample( @@ -91,18 +148,19 @@ def main(): start_at_step=20, end_at_step=10000, return_with_leftover_noise="disable", - model=checkpointloadersimple_12[0], - positive=cliptextencode_15[0], - negative=cliptextencode_16[0], - latent_image=ksampleradvanced_10[0], + model=get_value_at_index(checkpointloadersimple_12, 0), + positive=get_value_at_index(cliptextencode_15, 0), + negative=get_value_at_index(cliptextencode_16, 0), + latent_image=get_value_at_index(ksampleradvanced_10, 0), ) vaedecode_17 = vaedecode.decode( - samples=ksampleradvanced_11[0], vae=checkpointloadersimple_12[2] + samples=get_value_at_index(ksampleradvanced_11, 0), + vae=get_value_at_index(checkpointloadersimple_12, 2), ) saveimage_19 = saveimage.save_images( - filename_prefix="ComfyUI", images=vaedecode_17[0] + filename_prefix="ComfyUI", images=get_value_at_index(vaedecode_17, 0) ) @@ -168,4 +226,4 @@ if __name__ == "__main__": 9. After running `comfyui_to_python.py`, a new .py file will be created in the current working directory that contains the same name as the `input` variable. If you made no changes, look for `workflow_api.py`. -10. Now you can execute the newly created .py file to generate images without launching a server. \ No newline at end of file +10. Now you can execute the newly created .py file to generate images without launching a server. From b58cb3c189d18ebbdcf654374f7adaa844f946dc Mon Sep 17 00:00:00 2001 From: Peyton DeNiro Date: Sat, 19 Aug 2023 21:05:28 -0500 Subject: [PATCH 07/10] Update README.md --- README.md | 102 ++++++++++++------------------------------------------ 1 file changed, 22 insertions(+), 80 deletions(-) diff --git a/README.md b/README.md index 4032123..5afbcf3 100644 --- a/README.md +++ b/README.md @@ -10,74 +10,18 @@ The `ComfyUI-to-Python-Extension` is a powerful tool that translates ComfyUI wor **To this:** ``` -import os import random -import sys -from typing import Sequence, Mapping, Any, Union import torch +import sys - -def add_comfyui_directory_to_sys_path() -> None: - """ - Recursively looks at parent folders starting from the current working directory until it finds 'ComfyUI'. - Once found, the directory is added to sys.path. - """ - start_path = os.getcwd() # Get the current working directory - - def search_directory(path: str) -> None: - # Check if the current directory contains 'ComfyUI' - if "ComfyUI" in os.listdir(path): - directory_path = os.path.join(path, "ComfyUI") - sys.path.append(directory_path) - print(f"ComfyUI found and added to sys.path: {directory_path}") - - # Get the parent directory - parent_directory = os.path.dirname(path) - - # If the parent directory is the same as the current directory, we've reached the root and stop the search - if parent_directory == path: - return - - # Recursively call the function with the parent directory - search_directory(parent_directory) - - # Start the search from the current working directory - search_directory(start_path) - - -def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: - """Returns the value at the given index of a sequence or mapping. - - If the object is a sequence (like list or string), returns the value at the given index. - If the object is a mapping (like a dictionary), returns the value at the index-th key. - - Some return a dictionary, in these cases, we look for the "results" key - - Args: - obj (Union[Sequence, Mapping]): The object to retrieve the value from. - index (int): The index of the value to retrieve. - - Returns: - Any: The value at the given index. - - Raises: - IndexError: If the index is out of bounds for the object and the object is not a mapping. - """ - try: - return obj[index] - except KeyError: - return obj["result"][index] - - -add_comfyui_directory_to_sys_path() +sys.path.append("../") from nodes import ( - CLIPTextEncode, - KSamplerAdvanced, - CheckpointLoaderSimple, VAEDecode, - SaveImage, + KSamplerAdvanced, EmptyLatentImage, - NODE_CLASS_MAPPINGS, + SaveImage, + CheckpointLoaderSimple, + CLIPTextEncode, ) @@ -95,12 +39,12 @@ def main(): cliptextencode = CLIPTextEncode() cliptextencode_6 = cliptextencode.encode( - text="Kylo Ren trapped inside of a Mark Rothko painting", - clip=get_value_at_index(checkpointloadersimple_4, 1), + text="evening sunset scenery blue sky nature, glass bottle with a galaxy in it", + clip=checkpointloadersimple_4[1], ) cliptextencode_7 = cliptextencode.encode( - text="text, watermark", clip=get_value_at_index(checkpointloadersimple_4, 1) + text="text, watermark", clip=checkpointloadersimple_4[1] ) checkpointloadersimple_12 = checkpointloadersimple.load_checkpoint( @@ -108,13 +52,12 @@ def main(): ) cliptextencode_15 = cliptextencode.encode( - text="Kylo Ren trapped inside of a Mark Rothko painting", - clip=get_value_at_index(checkpointloadersimple_12, 1), + text="evening sunset scenery blue sky nature, glass bottle with a galaxy in it", + clip=checkpointloadersimple_12[1], ) cliptextencode_16 = cliptextencode.encode( - text="text, watermark", - clip=get_value_at_index(checkpointloadersimple_12, 1), + text="text, watermark", clip=checkpointloadersimple_12[1] ) ksampleradvanced = KSamplerAdvanced() @@ -132,10 +75,10 @@ def main(): start_at_step=0, end_at_step=20, return_with_leftover_noise="enable", - model=get_value_at_index(checkpointloadersimple_4, 0), - positive=get_value_at_index(cliptextencode_6, 0), - negative=get_value_at_index(cliptextencode_7, 0), - latent_image=get_value_at_index(emptylatentimage_5, 0), + model=checkpointloadersimple_4[0], + positive=cliptextencode_6[0], + negative=cliptextencode_7[0], + latent_image=emptylatentimage_5[0], ) ksampleradvanced_11 = ksampleradvanced.sample( @@ -148,19 +91,18 @@ def main(): start_at_step=20, end_at_step=10000, return_with_leftover_noise="disable", - model=get_value_at_index(checkpointloadersimple_12, 0), - positive=get_value_at_index(cliptextencode_15, 0), - negative=get_value_at_index(cliptextencode_16, 0), - latent_image=get_value_at_index(ksampleradvanced_10, 0), + model=checkpointloadersimple_12[0], + positive=cliptextencode_15[0], + negative=cliptextencode_16[0], + latent_image=ksampleradvanced_10[0], ) vaedecode_17 = vaedecode.decode( - samples=get_value_at_index(ksampleradvanced_11, 0), - vae=get_value_at_index(checkpointloadersimple_12, 2), + samples=ksampleradvanced_11[0], vae=checkpointloadersimple_12[2] ) saveimage_19 = saveimage.save_images( - filename_prefix="ComfyUI", images=get_value_at_index(vaedecode_17, 0) + filename_prefix="ComfyUI", images=vaedecode_17[0] ) From e6587d82f7cf99ba2af7cd923a8069e36569232d Mon Sep 17 00:00:00 2001 From: Peyton DeNiro Date: Sat, 19 Aug 2023 21:14:37 -0500 Subject: [PATCH 08/10] Update README.md --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 5afbcf3..e73e9b0 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,11 @@ if __name__ == "__main__": main() ``` +## V1.0.0 Release Notes +- **Use all the custom nodes!** + - Custom nodes are now supported. If you run into any issues with code execution, first ensure that the each node works as expected in the GUI. If it works in the GUI, but not in the generated script, please submit an issue. + + ## Usage From 7a85aade182472763347329d2b5cb07ce86fdf8d Mon Sep 17 00:00:00 2001 From: Peyton Date: Sat, 19 Aug 2023 21:15:30 -0500 Subject: [PATCH 09/10] Updated import order. --- comfyui_to_python.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/comfyui_to_python.py b/comfyui_to_python.py index fa38ee1..a5f4a83 100644 --- a/comfyui_to_python.py +++ b/comfyui_to_python.py @@ -6,14 +6,12 @@ import os import random import sys +from typing import Dict, List, Any, Callable, Tuple import black -from typing import Dict, List, Any, Callable, Tuple - from utils import import_custom_nodes, add_comfyui_directory_to_sys_path, get_value_at_index - sys.path.append('../') from nodes import NODE_CLASS_MAPPINGS From d86c4807979a2571f5d0a1431fc47aa12f93113b Mon Sep 17 00:00:00 2001 From: Peyton DeNiro Date: Sat, 19 Aug 2023 21:18:29 -0500 Subject: [PATCH 10/10] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e73e9b0..00b30c4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ ## ComfyUI-to-Python-Extension -The `ComfyUI-to-Python-Extension` is a powerful tool that translates ComfyUI workflows into executable Python code. Designed to bridge the gap between ComfyUI's visual interface and Python's programming environment, this script facilitates the seamless transition from design to code execution. Whether you're a data scientist, a software developer, or an AI enthusiast, this tool streamlines the process of implementing ComfyUI workflows in Python. The output makes it easy to queue a large amount of images for generation and provides a base script to easily modify for experimination. +The `ComfyUI-to-Python-Extension` is a powerful tool that translates [ComfyUI](https://github.com/comfyanonymous/ComfyUI) workflows into executable Python code. Designed to bridge the gap between ComfyUI's visual interface and Python's programming environment, this script facilitates the seamless transition from design to code execution. Whether you're a data scientist, a software developer, or an AI enthusiast, this tool streamlines the process of implementing ComfyUI workflows in Python. The output makes it easy to queue a large amount of images for generation and provides a base script to easily modify for experimination. **Convert this:**