# app.py (Complete Final Version - Fixed SyntaxError in overlap loop) import gradio as gr import torch import clip from PIL import Image import numpy as np import os import cv2 import gc # Garbage collector import logging import random # For annotator colors import time # For timing checks import traceback # For detailed error printing # --- YOLOv8 Imports --- from ultralytics import YOLO from ultralytics.utils.plotting import Annotator # For drawing YOLO results # --- Setup Logging --- logging.getLogger("ultralytics").setLevel(logging.WARNING) # Reduce YOLO logging noise logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Constants --- # Damage segmentation classes (Order MUST match the training of 'model_best.pt') DAMAGE_CLASSES = ['Cracked', 'Scratch', 'Flaking', 'Broken part', 'Corrosion', 'Dent', 'Paint chip', 'Missing part'] NUM_DAMAGE_CLASSES = len(DAMAGE_CLASSES) # Part segmentation classes (Order MUST match the training of 'partdetection_yolobest.pt') CAR_PART_CLASSES = [ "Quarter-panel", "Front-wheel", "Back-window", "Trunk", "Front-door", "Rocker-panel", "Grille", "Windshield", "Front-window", "Back-door", "Headlight", "Back-wheel", "Back-windshield", "Hood", "Fender", "Tail-light", "License-plate", "Front-bumper", "Back-bumper", "Mirror", "Roof" ] NUM_CAR_PART_CLASSES = len(CAR_PART_CLASSES) # Paths within the Hugging Face Space repository (relative to app.py) CLIP_TEXT_FEATURES_PATH = "./clip_text_features.pt" DAMAGE_MODEL_WEIGHTS_PATH = "./model_best.pt" # Your YOLOv8 damage model weights PART_MODEL_WEIGHTS_PATH = "./partdetection_yolobest.pt" # Your YOLOv8 part model weights # Default Prediction Thresholds (can be overridden by sliders) DEFAULT_DAMAGE_PRED_THRESHOLD = 0.4 DEFAULT_PART_PRED_THRESHOLD = 0.3 # --- Device Setup --- if torch.cuda.is_available(): DEVICE = "cuda" logger.info("CUDA available, using GPU.") else: DEVICE = "cpu" logger.info("CUDA not available, using CPU.") # --- MODEL LOADING (Load models globally ONCE on startup) --- print("--- Initializing Models ---") clip_model = None clip_preprocess = None clip_text_features = None damage_model = None part_model = None clip_load_error_msg = None damage_load_error_msg = None part_load_error_msg = None # --- Load CLIP Model (Model 1) --- try: logger.info("Loading CLIP model (ViT-B/16)...") clip_model, clip_preprocess = clip.load("ViT-B/16", device=DEVICE, jit=False) clip_model.eval() logger.info("CLIP model loaded.") logger.info(f"Loading CLIP text features from {CLIP_TEXT_FEATURES_PATH}...") if not os.path.exists(CLIP_TEXT_FEATURES_PATH): raise FileNotFoundError(f"CLIP text features not found: {CLIP_TEXT_FEATURES_PATH}.") clip_text_features = torch.load(CLIP_TEXT_FEATURES_PATH, map_location=DEVICE) logger.info(f"CLIP text features loaded (dtype: {clip_text_features.dtype}).") except Exception as e: clip_load_error_msg = f"CLIP load error: {e}" logger.error(clip_load_error_msg, exc_info=True) clip_model = None # --- Load Damage Segmentation Model (Model 2 - YOLOv8) --- try: logger.info(f"Loading Damage Segmentation (YOLOv8) model from {DAMAGE_MODEL_WEIGHTS_PATH}...") if not os.path.exists(DAMAGE_MODEL_WEIGHTS_PATH): raise FileNotFoundError(f"Damage model weights not found: {DAMAGE_MODEL_WEIGHTS_PATH}.") damage_model = YOLO(DAMAGE_MODEL_WEIGHTS_PATH) damage_model.to(DEVICE) # Verify/Update class names loaded_damage_names = list(damage_model.names.values()) if loaded_damage_names != DAMAGE_CLASSES: logger.warning(f"Mismatch: Defined DAMAGE_CLASSES vs names in {DAMAGE_MODEL_WEIGHTS_PATH}") DAMAGE_CLASSES = loaded_damage_names # Use names from model file logger.warning(f"Updated DAMAGE_CLASSES to: {DAMAGE_CLASSES}") logger.info("Damage Segmentation (YOLOv8) model loaded.") except Exception as e: damage_load_error_msg = f"Damage YOLO load error: {e}" logger.error(damage_load_error_msg, exc_info=True) damage_model = None # --- Load Part Segmentation Model (Model 3 - YOLOv8) --- try: logger.info(f"Loading Part Segmentation (YOLOv8) model from {PART_MODEL_WEIGHTS_PATH}...") if not os.path.exists(PART_MODEL_WEIGHTS_PATH): raise FileNotFoundError(f"Part model weights not found: {PART_MODEL_WEIGHTS_PATH}.") part_model = YOLO(PART_MODEL_WEIGHTS_PATH) part_model.to(DEVICE) # Verify/Update class names loaded_part_names = list(part_model.names.values()) if loaded_part_names != CAR_PART_CLASSES: logger.warning(f"Mismatch: Defined CAR_PART_CLASSES vs names in {PART_MODEL_WEIGHTS_PATH}") CAR_PART_CLASSES = loaded_part_names # Use names from model file logger.warning(f"Updated CAR_PART_CLASSES to: {CAR_PART_CLASSES}") logger.info("Part Segmentation (YOLOv8) model loaded.") except Exception as e: part_load_error_msg = f"Part YOLO load error: {e}" logger.error(part_load_error_msg, exc_info=True) part_model = None print("--- Model loading process finished. ---") if clip_load_error_msg: print(f"CLIP STATUS: {clip_load_error_msg}") else: print("CLIP STATUS: Loaded OK.") if damage_load_error_msg: print(f"DAMAGE MODEL STATUS: {damage_load_error_msg}") else: print("DAMAGE MODEL STATUS: Loaded OK.") if part_load_error_msg: print(f"PART MODEL STATUS: {part_load_error_msg}") else: print("PART MODEL STATUS: Loaded OK.") # --- Prediction Functions --- def classify_image_clip(image_pil): """Classifies image using CLIP. Returns label and probability dictionary.""" if clip_model is None or clip_text_features is None: logger.error(f"CLIP model or text features not loaded. Error: {clip_load_error_msg}") return "Error: CLIP Model Not Loaded", {"Error": 1.0} logger.info("Running CLIP classification...") try: if image_pil.mode != "RGB": image_pil = image_pil.convert("RGB") t1 = time.time() image_input = clip_preprocess(image_pil).unsqueeze(0).to(DEVICE) with torch.no_grad(): image_features = clip_model.encode_image(image_input) image_features /= image_features.norm(dim=-1, keepdim=True) text_features_matched = clip_text_features if image_features.dtype != clip_text_features.dtype: logger.warning(f"CLIP Dtype mismatch! Image: {image_features.dtype}, Text: {clip_text_features.dtype}. Converting text...") text_features_matched = clip_text_features.to(image_features.dtype) logit_scale = clip_model.logit_scale.exp() similarity = (image_features @ text_features_matched.T) * logit_scale probs = similarity.softmax(dim=-1).squeeze().cpu() t2 = time.time() logger.info(f"CLIP processing time: {t2-t1:.2f}s") car_prob = probs[0].item(); not_car_prob = probs[1].item() predicted_label = "Car" if car_prob > not_car_prob else "Not Car" prob_dict = {"Car": f"{car_prob:.3f}", "Not Car": f"{not_car_prob:.3f}"} return predicted_label, prob_dict except Exception as e: logger.error(f"Error during CLIP prediction function: {e}", exc_info=True) traceback.print_exc() return "Error during CLIP processing", {"Error": 1.0} # --- CORRECTED process_car_image Function --- def process_car_image(image_np_bgr, damage_threshold, part_threshold): """ Runs damage and part segmentation (YOLOv8), calculates overlap, visualizes. Returns: combined_image_rgb (numpy), assignment_text (string) """ # Check model availability if damage_model is None: return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), f"Error: Damage model not loaded ({damage_load_error_msg})" if part_model is None: return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), f"Error: Part model not loaded ({part_load_error_msg})" final_assignments = [] annotated_image_bgr = image_np_bgr.copy() # Work on a copy img_h, img_w = image_np_bgr.shape[:2] logger.info("Starting combined YOLO processing...") yolo_start_time = time.time() im_tensor_gpu_for_annotator = None # Initialize try: # --- Create the image tensor ONCE for the annotator --- try: im_tensor_gpu_for_annotator = torch.from_numpy(image_np_bgr).to(DEVICE) # Keep HWC if not isinstance(im_tensor_gpu_for_annotator, torch.Tensor) or im_tensor_gpu_for_annotator.ndim != 3: raise ValueError("Failed to create valid HWC image tensor for annotator.") logger.info(f"Created image tensor for annotator (Shape: {im_tensor_gpu_for_annotator.shape}, Device: {im_tensor_gpu_for_annotator.device})") except Exception as e_tensor: logger.error(f"Could not create image tensor: {e_tensor}. Mask visualization might fail.", exc_info=True) im_tensor_gpu_for_annotator = None # --- 1. Predict Damages (YOLOv8) --- logger.info(f" Running Damage Segmentation (Threshold: {damage_threshold})...") damage_results = damage_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=damage_threshold) damage_result = damage_results[0] logger.info(f" Found {len(damage_result.boxes)} potential damages.") damage_masks_raw = damage_result.masks.data if damage_result.masks is not None else torch.empty((0,0,0), device=DEVICE) damage_classes_ids_cpu = damage_result.boxes.cls.cpu().numpy().astype(int) if damage_result.boxes is not None else np.array([]) damage_boxes_xyxy_cpu = damage_result.boxes.xyxy.cpu() if damage_result.boxes is not None else torch.empty((0,4)) # --- 2. Predict Parts (YOLOv8) --- logger.info(f" Running Part Segmentation (Threshold: {part_threshold})...") part_results = part_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=part_threshold) part_result = part_results[0] logger.info(f" Found {len(part_result.boxes)} potential parts.") part_masks_raw = part_result.masks.data if part_result.masks is not None else torch.empty((0,0,0), device=DEVICE) part_classes_ids_cpu = part_result.boxes.cls.cpu().numpy().astype(int) if part_result.boxes is not None else np.array([]) part_boxes_xyxy_cpu = part_result.boxes.xyxy.cpu() if part_result.boxes is not None else torch.empty((0,4)) yolo_end_time = time.time() logger.info(f" YOLO predictions took {yolo_end_time - yolo_start_time:.2f}s") # --- 3. Resize Masks --- def resize_masks(masks_tensor, target_h, target_w): """Resizes masks tensor to target H, W using CPU numpy and OpenCV.""" masks_np_bool = masks_tensor.cpu().numpy().astype(bool) if masks_np_bool.shape[0] == 0: return np.array([]) if masks_np_bool.ndim == 3 and masks_np_bool.shape[1] == target_h and masks_np_bool.shape[2] == target_w: return masks_np_bool if masks_np_bool.ndim == 2: masks_np_bool = np.expand_dims(masks_np_bool, axis=0) if masks_np_bool.ndim != 3: logger.error(f"Unexpected mask dim: {masks_np_bool.ndim}"); return np.array([]) resized_masks_list = [] for i in range(masks_np_bool.shape[0]): mask = masks_np_bool[i] mask_resized = cv2.resize(mask.astype(np.uint8), (target_w, target_h), interpolation=cv2.INTER_NEAREST) resized_masks_list.append(mask_resized.astype(bool)) return np.array(resized_masks_list) resize_start_time = time.time() damage_masks_np = resize_masks(damage_masks_raw, img_h, img_w) part_masks_np = resize_masks(part_masks_raw, img_h, img_w) resize_end_time = time.time() logger.info(f" Mask resizing took {resize_end_time - resize_start_time:.2f}s") # --- 4. Calculate Overlap --- logger.info(" Calculating overlap...") overlap_start_time = time.time() if damage_masks_np.shape[0] > 0 and part_masks_np.shape[0] > 0: overlap_threshold = 0.4 # *** CORRECTED OVERLAP LOOP SYNTAX *** for i in range(len(damage_masks_np)): # Iterate through each detected damage damage_mask = damage_masks_np[i] damage_class_id = damage_classes_ids_cpu[i] # Try getting damage name, skip if ID is invalid try: damage_name = DAMAGE_CLASSES[damage_class_id] except IndexError: logger.warning(f"Invalid damage ID {damage_class_id} found during overlap check. Skipping this damage.") continue # Go to the next damage mask # Check damage area (only if name was valid) damage_area = np.sum(damage_mask) if damage_area < 10: # Skip tiny masks continue # Initialize for finding best overlapping part for this damage max_overlap = 0 assigned_part_name = "Unknown / Outside Parts" # Default # Inner loop for parts for j in range(len(part_masks_np)): part_mask = part_masks_np[j] part_class_id = part_classes_ids_cpu[j] try: part_name = CAR_PART_CLASSES[part_class_id] except IndexError: logger.warning(f"Invalid part ID {part_class_id} during overlap check.") continue # Skip this part intersection = np.logical_and(damage_mask, part_mask) overlap_ratio = np.sum(intersection) / damage_area if damage_area > 0 else 0 if overlap_ratio > max_overlap: max_overlap = overlap_ratio # Assign based on threshold condition within the inner loop if max_overlap >= overlap_threshold: assigned_part_name = part_name else: # If max overlap is < threshold even for best part, keep default assigned_part_name = "Unknown / Outside Parts" # Append assignment result after checking all parts for the current damage assignment_desc = f"{damage_name} in {assigned_part_name}" if assigned_part_name == "Unknown / Outside Parts" and max_overlap > 0: # Add detail if there was *some* overlap assignment_desc += f" (Max Overlap < {overlap_threshold*100:.0f}%)" elif assigned_part_name == "Unknown / Outside Parts": assignment_desc += " (No overlap)" # Clarify if zero overlap final_assignments.append(assignment_desc) # *** END OF CORRECTED OVERLAP LOOP *** # Handle cases with no damages or no parts elif damage_masks_np.shape[0] > 0: final_assignments.append(f"{len(damage_masks_np)} damages found, but no parts detected/matched above threshold {part_threshold}.") elif part_masks_np.shape[0] > 0: final_assignments.append(f"No damages detected above threshold {damage_threshold}.") else: final_assignments.append(f"No damages or parts detected above thresholds.") overlap_end_time = time.time(); logger.info(f" Overlap calculation took {overlap_end_time - overlap_start_time:.2f}s"); logger.info(f" Assignment results: {final_assignments}") # --- 5. Visualization using YOLO Annotator --- logger.info(" Visualizing results...") vis_start_time = time.time() annotator = Annotator(annotated_image_bgr, line_width=2, example=part_model.names) # Draw PART masks first (Greenish) if part_result.masks is not None and im_tensor_gpu_for_annotator is not None: try: colors_part = [(0, random.randint(100, 200), 0) for _ in part_classes_ids_cpu] annotator.masks(part_masks_raw, colors=colors_part, im_gpu=im_tensor_gpu_for_annotator, alpha=0.3) for box, cls_id in zip(part_boxes_xyxy_cpu, part_classes_ids_cpu): try: label = f"{CAR_PART_CLASSES[cls_id]}"; annotator.box_label(box, label=label, color=(0, 200, 0)) except IndexError: logger.warning(f"Invalid part ID {cls_id} during drawing"); continue except Exception as e_part_vis: logger.error(f"Error drawing part masks/boxes: {e_part_vis}", exc_info=True) elif part_result.masks is not None: logger.warning("Could not draw part masks because image tensor creation failed.") # Draw DAMAGE masks second (Reddish) if damage_result.masks is not None and im_tensor_gpu_for_annotator is not None: try: colors_dmg = [(random.randint(100, 200), 0, 0) for _ in damage_classes_ids_cpu] annotator.masks(damage_masks_raw, colors=colors_dmg, im_gpu=im_tensor_gpu_for_annotator, alpha=0.4) for box, cls_id in zip(damage_boxes_xyxy_cpu, damage_classes_ids_cpu): try: label = f"{DAMAGE_CLASSES[cls_id]}"; annotator.box_label(box, label=label, color=(200, 0, 0)) except IndexError: logger.warning(f"Invalid damage ID {cls_id} during drawing"); continue except Exception as e_dmg_vis: logger.error(f"Error drawing damage masks/boxes: {e_dmg_vis}", exc_info=True) elif damage_result.masks is not None: logger.warning("Could not draw damage masks because image tensor creation failed.") annotated_image_bgr = annotator.result() # Get final BGR image vis_end_time = time.time() logger.info(f" Visualization took {vis_end_time - vis_start_time:.2f}s") except Exception as e: logger.error(f"Error during combined processing: {e}", exc_info=True) traceback.print_exc() final_assignments.append("Error during segmentation/processing.") # --- Prepare output --- assignment_text = "\n".join(final_assignments) if final_assignments else "No damage assignments generated." # Convert final annotated image to RGB for Gradio display final_output_image_rgb = cv2.cvtColor(annotated_image_bgr, cv2.COLOR_BGR2RGB) return final_output_image_rgb, assignment_text # --- Main Gradio Function --- def predict_pipeline(image_np_input, damage_thresh, part_thresh): """ Main pipeline: Classify -> Segment -> Assign -> Visualize """ if image_np_input is None: return "Please upload an image.", {}, None, "N/A" logger.info(f"--- New Request (Damage Thr: {damage_thresh:.2f}, Part Thr: {part_thresh:.2f}) ---") start_time = time.time() # Gradio provides RGB numpy, convert to BGR for OpenCV/YOLO internal, PIL for CLIP image_np_bgr = cv2.cvtColor(image_np_input, cv2.COLOR_RGB2BGR) image_pil = Image.fromarray(image_np_input) # Input numpy is RGB final_output_image = None assignment_text = "Processing..." classification_result = "Error" probabilities = {} # --- Stage 1: CLIP Classification --- try: classification_result, probabilities = classify_image_clip(image_pil) except Exception as e: logger.error(f"Error in CLIP stage: {e}", exc_info=True); assignment_text = f"CLIP Error: {e}"; # Prepare original image for display in case of error final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB) # --- Stage 2 & 3: Segmentation and Assignment (if 'Car') --- if classification_result == "Car": logger.info("Image classified as Car. Running segmentation and assignment...") try: final_output_image, assignment_text = process_car_image(image_np_bgr, damage_thresh, part_thresh) except Exception as e: logger.error(f"Error in segmentation/assignment stage: {e}", exc_info=True); assignment_text = f"Segmentation Error: {e}"; final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB) elif classification_result == "Not Car": logger.info("Image classified as Not Car."); final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB); assignment_text = "Image classified as Not Car." # Handle CLIP error case (already logged) elif final_output_image is None: # Ensure image is set if CLIP error occurred before seg stage logger.error("CLIP classification failed."); final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB); assignment_text = "Error during classification." # --- Cleanup --- gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() end_time = time.time() logger.info(f"Total processing time: {end_time - start_time:.2f} seconds.") # Return all results return classification_result, probabilities, final_output_image, assignment_text # --- Gradio Interface --- logger.info("Setting up Gradio interface...") title = "🚗 Car Damage Detection" # Updated Title description = """ 1. **Upload** an image of a vehicle. 2. **Classification:** Determines if the image contains a car (using CLIP). 3. **Segmentation:** If it's a car, detects car parts and damages (using YOLOv8 for both). Adjust confidence thresholds using sliders. 4. **Assignment:** Assigns detected damages to the corresponding car part based on mask overlap (threshold >40%). 5. **Output:** Shows the image with overlaid masks (Green=Part, Red=Damage) and lists the damage assignments. """ examples = [] # Add example image paths if uploaded (e.g., ["./example1.jpg"]) # Define Inputs and Outputs including sliders input_image = gr.Image(type="numpy", label="Upload Car Image") damage_threshold_slider = gr.Slider(minimum=0.05, maximum=0.95, step=0.05, value=DEFAULT_DAMAGE_PRED_THRESHOLD, label="Damage Confidence Threshold") part_threshold_slider = gr.Slider(minimum=0.05, maximum=0.95, step=0.05, value=DEFAULT_PART_PRED_THRESHOLD, label="Part Confidence Threshold") output_classification = gr.Textbox(label="1. Classification Result") output_probabilities = gr.Label(label="Classification Probabilities") output_image_display = gr.Image(type="numpy", label="3. Segmentation Visualization") output_assignment = gr.Textbox(label="2. Damage Assignments", lines=5, interactive=False) # Launch the interface with updated inputs/title iface = gr.Interface( fn=predict_pipeline, inputs=[input_image, damage_threshold_slider, part_threshold_slider], # Sliders added outputs=[output_classification, output_probabilities, output_image_display, output_assignment], title=title, # Updated title description=description, examples=examples, allow_flagging="never" ) if __name__ == "__main__": logger.info("Launching Gradio app...") iface.launch() # Set share=True for public link if needed