Spaces:
Running
Running
| from abc import ABC, abstractmethod | |
| from CodeParser import CodeParser | |
| from utils import count_tokens | |
| class Chunker(ABC): | |
| def __init__(self, encoding_name="gpt-4"): | |
| self.encoding_name = encoding_name | |
| def chunk(self, content, token_limit): | |
| pass | |
| def get_chunk(self, chunked_content, chunk_number): | |
| pass | |
| def print_chunks(chunks): | |
| for chunk_number, chunk_code in chunks.items(): | |
| print(f"Chunk {chunk_number}:") | |
| print("=" * 40) | |
| print(chunk_code) | |
| print("=" * 40) | |
| def consolidate_chunks_into_file(chunks): | |
| return "\n".join(chunks.values()) | |
| def count_lines(consolidated_chunks): | |
| lines = consolidated_chunks.split("\n") | |
| return len(lines) | |
| class CodeChunker(Chunker): | |
| def __init__(self, file_extension, encoding_name="gpt-4"): | |
| super().__init__(encoding_name) | |
| self.file_extension = file_extension | |
| def chunk(self, code, token_limit) -> dict: | |
| code_parser = CodeParser(self.file_extension) | |
| chunks = {} | |
| current_chunk = "" | |
| token_count = 0 | |
| lines = code.split("\n") | |
| i = 0 | |
| chunk_number = 1 | |
| start_line = 0 | |
| breakpoints = sorted(code_parser.get_lines_for_points_of_interest(code, self.file_extension)) | |
| comments = sorted(code_parser.get_lines_for_comments(code, self.file_extension)) | |
| adjusted_breakpoints = [] | |
| for bp in breakpoints: | |
| current_line = bp - 1 | |
| highest_comment_line = None # Initialize with None to indicate no comment line has been found yet | |
| while current_line in comments: | |
| highest_comment_line = current_line # Update highest comment line found | |
| current_line -= 1 # Move to the previous line | |
| if highest_comment_line: # If a highest comment line exists, add it | |
| adjusted_breakpoints.append(highest_comment_line) | |
| else: | |
| adjusted_breakpoints.append( | |
| bp) # If no comments were found before the breakpoint, add the original breakpoint | |
| breakpoints = sorted(set(adjusted_breakpoints)) # Ensure breakpoints are unique and sorted | |
| while i < len(lines): | |
| line = lines[i] | |
| new_token_count = count_tokens(line, self.encoding_name) | |
| if token_count + new_token_count > token_limit: | |
| # Set the stop line to the last breakpoint before the current line | |
| if i in breakpoints: | |
| stop_line = i | |
| else: | |
| stop_line = max(max([x for x in breakpoints if x < i], default=start_line), start_line) | |
| # If the stop line is the same as the start line, it means we haven't reached a breakpoint yet and we need to move to the next line to find one | |
| if stop_line == start_line and i not in breakpoints: | |
| token_count += new_token_count | |
| i += 1 | |
| # If the stop line is the same as the start line and the current line is a breakpoint, it means we can create a chunk with just the current line | |
| elif stop_line == start_line and i == stop_line: | |
| token_count += new_token_count | |
| i += 1 | |
| # If the stop line is the same as the start line and the current line is a breakpoint, it means we can create a chunk with just the current line | |
| elif stop_line == start_line and i in breakpoints: | |
| current_chunk = "\n".join(lines[start_line:stop_line]) | |
| if current_chunk.strip(): # If the current chunk is not just whitespace | |
| chunks[chunk_number] = current_chunk # Using chunk_number as key | |
| chunk_number += 1 | |
| token_count = 0 | |
| start_line = i | |
| i += 1 | |
| # If the stop line is different from the start line, it means we're at the end of a block | |
| else: | |
| current_chunk = "\n".join(lines[start_line:stop_line]) | |
| if current_chunk.strip(): | |
| chunks[chunk_number] = current_chunk # Using chunk_number as key | |
| chunk_number += 1 | |
| i = stop_line | |
| token_count = 0 | |
| start_line = stop_line | |
| else: | |
| # If the token count is still within the limit, add the line to the current chunk | |
| token_count += new_token_count | |
| i += 1 | |
| # Append remaining code, if any, ensuring it's not empty or whitespace | |
| current_chunk_code = "\n".join(lines[start_line:]) | |
| if current_chunk_code.strip(): # Checks if the chunk is not just whitespace | |
| chunks[chunk_number] = current_chunk_code # Using chunk_number as key | |
| return chunks | |
| def get_chunk(self, chunked_codebase, chunk_number): | |
| return chunked_codebase[chunk_number] | |