# app.py import gradio as gr import torch import clip from PIL import Image import numpy as np import os import cv2 import gc # Garbage collector import logging import random # For annotator colors import time # For timing checks import traceback # For detailed error printing # --- YOLOv8 Imports --- from ultralytics import YOLO from ultralytics.utils.plotting import Annotator # For drawing YOLO results # --- Setup Logging --- logging.getLogger("ultralytics").setLevel(logging.WARNING) # Reduce YOLO logging noise logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Constants --- DAMAGE_CLASSES = ['Cracked', 'Scratch', 'Flaking', 'Broken part', 'Corrosion', 'Dent', 'Paint chip', 'Missing part'] NUM_DAMAGE_CLASSES = len(DAMAGE_CLASSES) CAR_PART_CLASSES = [ "Quarter-panel", "Front-wheel", "Back-window", "Trunk", "Front-door", "Rocker-panel", "Grille", "Windshield", "Front-window", "Back-door", "Headlight", "Back-wheel", "Back-windshield", "Hood", "Fender", "Tail-light", "License-plate", "Front-bumper", "Back-bumper", "Mirror", "Roof" ] NUM_CAR_PART_CLASSES = len(CAR_PART_CLASSES) CLIP_TEXT_FEATURES_PATH = "./clip_text_features.pt" DAMAGE_MODEL_WEIGHTS_PATH = "./best.pt" PART_MODEL_WEIGHTS_PATH = "./partdetection_yolobest.pt" DEFAULT_DAMAGE_PRED_THRESHOLD = 0.4 DEFAULT_PART_PRED_THRESHOLD = 0.3 # --- Device Setup --- DEVICE = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Using device: {DEVICE}") # --- MODEL LOADING --- print("--- Initializing Models ---") clip_model, clip_preprocess, clip_text_features = None, None, None damage_model, part_model = None, None clip_load_error_msg, damage_load_error_msg, part_load_error_msg = None, None, None try: logger.info("Loading CLIP model (ViT-B/16)...") clip_model, clip_preprocess = clip.load("ViT-B/16", device=DEVICE, jit=False) clip_model.eval() if not os.path.exists(CLIP_TEXT_FEATURES_PATH): raise FileNotFoundError(f"CLIP text features not found: {CLIP_TEXT_FEATURES_PATH}.") clip_text_features = torch.load(CLIP_TEXT_FEATURES_PATH, map_location=DEVICE) logger.info(f"CLIP loaded (Text Features dtype: {clip_text_features.dtype}).") except Exception as e: clip_load_error_msg = f"CLIP load error: {e}" logger.error(clip_load_error_msg, exc_info=True) try: logger.info(f"Loading Damage YOLOv8 model from {DAMAGE_MODEL_WEIGHTS_PATH}...") if not os.path.exists(DAMAGE_MODEL_WEIGHTS_PATH): raise FileNotFoundError(f"Damage model weights not found: {DAMAGE_MODEL_WEIGHTS_PATH}.") damage_model = YOLO(DAMAGE_MODEL_WEIGHTS_PATH) damage_model.to(DEVICE) logger.info(f"Damage model task: {damage_model.task}") if damage_model.task != 'segment': damage_load_error_msg = f"CRITICAL ERROR: Damage model task is {damage_model.task}, not 'segment'. This model won't produce masks!" logger.error(damage_load_error_msg) damage_model = None # Invalidate model else: loaded_damage_names = list(damage_model.names.values()) if loaded_damage_names != DAMAGE_CLASSES: logger.warning(f"Mismatch: Defined DAMAGE_CLASSES vs names in {DAMAGE_MODEL_WEIGHTS_PATH}") DAMAGE_CLASSES = loaded_damage_names logger.warning(f"Updated DAMAGE_CLASSES to: {DAMAGE_CLASSES}") logger.info("Damage YOLOv8 model loaded.") except Exception as e: damage_load_error_msg = f"Damage YOLO load error: {e}" logger.error(damage_load_error_msg, exc_info=True) damage_model = None try: logger.info(f"Loading Part YOLOv8 model from {PART_MODEL_WEIGHTS_PATH}...") if not os.path.exists(PART_MODEL_WEIGHTS_PATH): raise FileNotFoundError(f"Part model weights not found: {PART_MODEL_WEIGHTS_PATH}.") part_model = YOLO(PART_MODEL_WEIGHTS_PATH) part_model.to(DEVICE) logger.info(f"Part model task: {part_model.task}") if part_model.task != 'segment': part_load_error_msg = f"CRITICAL ERROR: Part model task is {part_model.task}, not 'segment'. This model won't produce masks!" logger.error(part_load_error_msg) part_model = None # Invalidate model else: loaded_part_names = list(part_model.names.values()) if loaded_part_names != CAR_PART_CLASSES: logger.warning(f"Mismatch: Defined CAR_PART_CLASSES vs names in {PART_MODEL_WEIGHTS_PATH}") CAR_PART_CLASSES = loaded_part_names logger.warning(f"Updated CAR_PART_CLASSES to: {CAR_PART_CLASSES}") logger.info("Part YOLOv8 model loaded.") except Exception as e: part_load_error_msg = f"Part YOLO load error: {e}" logger.error(part_load_error_msg, exc_info=True) part_model = None print("--- Model loading process finished. ---") if clip_load_error_msg: print(f"CLIP STATUS: {clip_load_error_msg}") else: print("CLIP STATUS: Loaded OK.") if damage_load_error_msg: print(f"DAMAGE MODEL STATUS: {damage_load_error_msg}") else: print("DAMAGE MODEL STATUS: Loaded OK.") if part_load_error_msg: print(f"PART MODEL STATUS: {part_load_error_msg}") else: print("PART MODEL STATUS: Loaded OK.") # --- Add DirectVisualizer class for fallback visualization --- class DirectVisualizer: """Fallback visualizer for when Ultralytics Annotator doesn't work""" def __init__(self, image): self.image = image.copy() def draw_masks(self, masks_np, class_ids, class_names, color_type="damage"): """Draw masks directly using OpenCV""" if masks_np.shape[0] == 0: return for i, (mask, class_id) in enumerate(zip(masks_np, class_ids)): if not np.any(mask): # Skip empty masks continue # Set color based on type if color_type == "damage": color = (0, 0, 255) # BGR Red alpha = 0.4 else: color = (0, 255, 0) # BGR Green alpha = 0.3 # Create color overlay overlay = self.image.copy() overlay[mask] = color # Apply with transparency cv2.addWeighted(overlay, alpha, self.image, 1-alpha, 0, self.image) # Draw contour mask_8bit = mask.astype(np.uint8) * 255 contours, _ = cv2.findContours(mask_8bit, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) cv2.drawContours(self.image, contours, -1, color, 2) # Add text label try: if 0 <= class_id < len(class_names): label = class_names[class_id] M = cv2.moments(mask_8bit) if M["m00"] > 0: cx = int(M["m10"] / M["m00"]) cy = int(M["m01"] / M["m00"]) cv2.putText(self.image, label, (cx, cy), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) except Exception as e: logger.warning(f"Error adding label: {e}") def result(self): """Return the final image""" return self.image # --- Prediction Functions --- def classify_image_clip(image_pil): if clip_model is None: return "Error: CLIP Model Not Loaded", {"Error": 1.0} try: if image_pil.mode != "RGB": image_pil = image_pil.convert("RGB") image_input = clip_preprocess(image_pil).unsqueeze(0).to(DEVICE) with torch.no_grad(): image_features = clip_model.encode_image(image_input) image_features /= image_features.norm(dim=-1, keepdim=True) text_features_matched = clip_text_features if image_features.dtype != clip_text_features.dtype: text_features_matched = clip_text_features.to(image_features.dtype) similarity = (image_features @ text_features_matched.T) * clip_model.logit_scale.exp() probs = similarity.softmax(dim=-1).squeeze().cpu() return ("Car" if probs[0] > probs[1] else "Not Car"), {"Car": f"{probs[0]:.3f}", "Not Car": f"{probs[1]:.3f}"} except Exception as e: logger.error(f"CLIP Error: {e}", exc_info=True) return "Error: CLIP", {"Error": 1.0} def process_car_image(image_np_bgr, damage_threshold, part_threshold): if damage_model is None: return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), f"Error: Damage model failed to load ({damage_load_error_msg})" if part_model is None: return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), f"Error: Part model failed to load ({part_load_error_msg})" if damage_model.task != 'segment': return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), "Error: Damage model is not a segmentation model." if part_model.task != 'segment': return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), "Error: Part model is not a segmentation model." final_assignments = [] annotated_image_bgr = image_np_bgr.copy() img_h, img_w = image_np_bgr.shape[:2] logger.info("Starting combined YOLO processing...") im_tensor_gpu_for_annotator = None ultralytic_viz_success = False try: # --- Check Ultralytics Version --- try: import pkg_resources ultraytics_version = pkg_resources.get_distribution("ultralytics").version logger.info(f"Ultralytics version: {ultraytics_version}") except: logger.warning("Could not determine Ultralytics version") # --- Prepare Image Tensor for Annotator --- logger.info("Preparing image tensor for annotator...") try: if image_np_bgr.dtype != np.uint8: logger.warning(f"Converting input image from {image_np_bgr.dtype} to uint8 for tensor creation.") image_np_uint8 = image_np_bgr.astype(np.uint8) else: image_np_uint8 = image_np_bgr # Create tensor in HWC format on the correct device im_tensor_gpu_for_annotator = torch.from_numpy(image_np_uint8).to(DEVICE) logger.info(f"Image tensor for annotator: shape={im_tensor_gpu_for_annotator.shape}, dtype={im_tensor_gpu_for_annotator.dtype}, device={im_tensor_gpu_for_annotator.device}") except Exception as e_tensor: logger.error(f"Could not create image tensor: {e_tensor}. Mask visualization will fail.", exc_info=True) im_tensor_gpu_for_annotator = None # Set to None if conversion fails # --- 1. Predict Damages --- logger.info(f"Running Damage Segmentation (Threshold: {damage_threshold})...") damage_results = damage_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=damage_threshold) damage_result = damage_results[0] logger.info(f"Found {len(damage_result.boxes)} potential damages.") # Check mask availability and format if damage_result.masks is None: logger.warning("No damage masks in result! Check if damage model is segmentation type.") damage_masks_raw = torch.empty((0,0,0), device=DEVICE) else: logger.info(f"Damage masks type: {type(damage_result.masks)}") try: if hasattr(damage_result.masks, 'data'): damage_masks_raw = damage_result.masks.data logger.info(f"Damage masks via .data: shape={damage_masks_raw.shape}, dtype={damage_masks_raw.dtype}") elif isinstance(damage_result.masks, torch.Tensor): damage_masks_raw = damage_result.masks logger.info(f"Damage masks as tensor: shape={damage_masks_raw.shape}, dtype={damage_masks_raw.dtype}") else: logger.warning(f"Unknown mask type: {type(damage_result.masks)}") damage_masks_raw = torch.empty((0,0,0), device=DEVICE) except Exception as e_mask: logger.error(f"Error accessing damage masks: {e_mask}", exc_info=True) damage_masks_raw = torch.empty((0,0,0), device=DEVICE) damage_classes_ids_cpu = damage_result.boxes.cls.cpu().numpy().astype(int) if damage_result.boxes is not None else np.array([]) damage_boxes_xyxy_cpu = damage_result.boxes.xyxy.cpu() if damage_result.boxes is not None else torch.empty((0,4)) # --- 2. Predict Parts --- logger.info(f"Running Part Segmentation (Threshold: {part_threshold})...") part_results = part_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=part_threshold) part_result = part_results[0] logger.info(f"Found {len(part_result.boxes)} potential parts.") # Check mask availability and format if part_result.masks is None: logger.warning("No part masks in result! Check if part model is segmentation type.") part_masks_raw = torch.empty((0,0,0), device=DEVICE) else: logger.info(f"Part masks type: {type(part_result.masks)}") try: if hasattr(part_result.masks, 'data'): part_masks_raw = part_result.masks.data logger.info(f"Part masks via .data: shape={part_masks_raw.shape}, dtype={part_masks_raw.dtype}") elif isinstance(part_result.masks, torch.Tensor): part_masks_raw = part_result.masks logger.info(f"Part masks as tensor: shape={part_masks_raw.shape}, dtype={part_masks_raw.dtype}") else: logger.warning(f"Unknown mask type: {type(part_result.masks)}") part_masks_raw = torch.empty((0,0,0), device=DEVICE) except Exception as e_mask: logger.error(f"Error accessing part masks: {e_mask}", exc_info=True) part_masks_raw = torch.empty((0,0,0), device=DEVICE) part_classes_ids_cpu = part_result.boxes.cls.cpu().numpy().astype(int) if part_result.boxes is not None else np.array([]) part_boxes_xyxy_cpu = part_result.boxes.xyxy.cpu() if part_result.boxes is not None else torch.empty((0,4)) # --- 3. Resize Masks --- def resize_masks(masks_tensor, target_h, target_w): if masks_tensor is None or masks_tensor.numel() == 0 or masks_tensor.shape[0] == 0: logger.warning("Empty masks tensor passed to resize_masks") return np.zeros((0, target_h, target_w), dtype=bool) try: masks_np_bool = masks_tensor.cpu().numpy().astype(bool) logger.info(f"Resizing masks from {masks_np_bool.shape} to ({target_h}, {target_w})") if masks_np_bool.shape[1] == target_h and masks_np_bool.shape[2] == target_w: return masks_np_bool resized_masks_list = [] for i in range(masks_np_bool.shape[0]): mask = masks_np_bool[i] mask_resized = cv2.resize(mask.astype(np.uint8), (target_w, target_h), interpolation=cv2.INTER_NEAREST) resized_masks_list.append(mask_resized.astype(bool)) return np.array(resized_masks_list) except Exception as e_resize: logger.error(f"Error resizing masks: {e_resize}", exc_info=True) return np.zeros((0, target_h, target_w), dtype=bool) damage_masks_np = resize_masks(damage_masks_raw, img_h, img_w) part_masks_np = resize_masks(part_masks_raw, img_h, img_w) # --- 4. Calculate Overlap --- logger.info("Calculating overlap...") if damage_masks_np.shape[0] > 0 and part_masks_np.shape[0] > 0: overlap_threshold = 0.4 for i in range(len(damage_masks_np)): damage_mask = damage_masks_np[i] damage_class_id = damage_classes_ids_cpu[i] try: damage_name = DAMAGE_CLASSES[damage_class_id] except IndexError: logger.warning(f"Invalid damage ID {damage_class_id}") continue damage_area = np.sum(damage_mask) if damage_area < 10: continue max_overlap = 0 assigned_part_name = "Unknown / Outside Parts" for j in range(len(part_masks_np)): part_mask = part_masks_np[j] part_class_id = part_classes_ids_cpu[j] try: part_name = CAR_PART_CLASSES[part_class_id] except IndexError: logger.warning(f"Invalid part ID {part_class_id}") continue intersection = np.logical_and(damage_mask, part_mask) overlap_ratio = np.sum(intersection) / damage_area if damage_area > 0 else 0 if overlap_ratio > max_overlap: max_overlap = overlap_ratio if max_overlap >= overlap_threshold: assigned_part_name = part_name assignment_desc = f"{damage_name} in {assigned_part_name}" if assigned_part_name == "Unknown / Outside Parts": assignment_desc += f" (Overlap < {overlap_threshold*100:.0f}%)" final_assignments.append(assignment_desc) elif damage_masks_np.shape[0] > 0: final_assignments.append(f"{len(damage_masks_np)} damages found, but no parts detected/matched above threshold {part_threshold}.") elif part_masks_np.shape[0] > 0: final_assignments.append(f"No damages detected above threshold {damage_threshold}.") else: final_assignments.append(f"No damages or parts detected above thresholds.") logger.info(f"Assignment results: {final_assignments}") # --- 5. Try BOTH visualization approaches --- # First attempt: Ultralytics annotator try: logger.info("Trying Ultralytics Annotator for visualization...") annotator = Annotator(annotated_image_bgr.copy(), line_width=2, example=CAR_PART_CLASSES) # Draw part masks with Ultralytics if part_masks_raw.numel() > 0 and im_tensor_gpu_for_annotator is not None: try: colors_part = [(0, random.randint(100, 200), 0) for _ in part_classes_ids_cpu] mask_data_part = part_masks_raw if mask_data_part.device != im_tensor_gpu_for_annotator.device: mask_data_part = mask_data_part.to(im_tensor_gpu_for_annotator.device) annotator.masks(mask_data_part, colors=colors_part, im_gpu=im_tensor_gpu_for_annotator, alpha=0.3) for box, cls_id in zip(part_boxes_xyxy_cpu, part_classes_ids_cpu): try: label = f"{CAR_PART_CLASSES[cls_id]}" annotator.box_label(box, label=label, color=(0, 200, 0)) except IndexError: logger.warning(f"Invalid part ID {cls_id}") logger.info("Successfully drew part masks with annotator") ultralytic_viz_success = True except Exception as e_part: logger.error(f"Error drawing part masks with annotator: {e_part}", exc_info=True) # Draw damage masks with Ultralytics if damage_masks_raw.numel() > 0 and im_tensor_gpu_for_annotator is not None: try: colors_dmg = [(random.randint(100, 200), 0, 0) for _ in damage_classes_ids_cpu] mask_data_dmg = damage_masks_raw if mask_data_dmg.device != im_tensor_gpu_for_annotator.device: mask_data_dmg = mask_data_dmg.to(im_tensor_gpu_for_annotator.device) annotator.masks(mask_data_dmg, colors=colors_dmg, im_gpu=im_tensor_gpu_for_annotator, alpha=0.4) for box, cls_id in zip(damage_boxes_xyxy_cpu, damage_classes_ids_cpu): try: label = f"{DAMAGE_CLASSES[cls_id]}" annotator.box_label(box, label=label, color=(200, 0, 0)) except IndexError: logger.warning(f"Invalid damage ID {cls_id}") logger.info("Successfully drew damage masks with annotator") ultralytic_viz_success = True except Exception as e_dmg: logger.error(f"Error drawing damage masks with annotator: {e_dmg}", exc_info=True) # Get result from annotator if successful if ultralytic_viz_success: annotated_image_bgr = annotator.result() logger.info("Using Ultralytics annotator visualization") else: logger.warning("Ultralytics annotator visualization failed, will try direct approach") except Exception as e_anno: logger.error(f"Error with Ultralytics annotator: {e_anno}", exc_info=True) ultralytic_viz_success = False # Second attempt: Direct visualization with OpenCV if Ultralytics failed if not ultralytic_viz_success: try: logger.info("Using DirectVisualizer as fallback...") direct_viz = DirectVisualizer(image_np_bgr.copy()) # Draw part masks first (background layer) if part_masks_np.shape[0] > 0: logger.info(f"Drawing {part_masks_np.shape[0]} part masks directly") direct_viz.draw_masks(part_masks_np, part_classes_ids_cpu, CAR_PART_CLASSES, "part") # Draw damage masks on top if damage_masks_np.shape[0] > 0: logger.info(f"Drawing {damage_masks_np.shape[0]} damage masks directly") direct_viz.draw_masks(damage_masks_np, damage_classes_ids_cpu, DAMAGE_CLASSES, "damage") annotated_image_bgr = direct_viz.result() logger.info("Direct visualization successful") except Exception as e_direct: logger.error(f"Error with direct visualization: {e_direct}", exc_info=True) # If direct visualization also fails, use the original image annotated_image_bgr = image_np_bgr.copy() except Exception as e: logger.error(f"Error during combined processing: {e}", exc_info=True) traceback.print_exc() final_assignments.append(f"Error during processing: {str(e)}") annotated_image_bgr = image_np_bgr.copy() assignment_text = "\n".join(final_assignments) if final_assignments else "No damage assignments generated." final_output_image_rgb = cv2.cvtColor(annotated_image_bgr, cv2.COLOR_BGR2RGB) return final_output_image_rgb, assignment_text # --- Main Gradio Function --- def predict_pipeline(image_np_input, damage_thresh, part_thresh): if image_np_input is None: return "Please upload an image.", {}, None, "N/A" logger.info(f"--- New Request (Damage Thr: {damage_thresh:.2f}, Part Thr: {part_thresh:.2f}) ---") start_time = time.time() image_np_bgr = cv2.cvtColor(image_np_input, cv2.COLOR_RGB2BGR) image_pil = Image.fromarray(image_np_input) final_output_image, assignment_text, classification_result, probabilities = None, "Processing...", "Error", {} try: classification_result, probabilities = classify_image_clip(image_pil) except Exception as e: logger.error(f"CLIP Error: {e}", exc_info=True) assignment_text = f"CLIP Error: {e}" final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB) if classification_result == "Car": try: final_output_image, assignment_text = process_car_image(image_np_bgr, damage_thresh, part_thresh) except Exception as e: logger.error(f"Seg/Assign Error: {e}", exc_info=True) assignment_text = f"Seg Error: {e}" final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB) elif classification_result == "Not Car": final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB) assignment_text = "Image classified as Not Car." elif final_output_image is None: final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB) assignment_text = "Error during classification." gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info(f"Total processing time: {time.time() - start_time:.2f}s.") return classification_result, probabilities, final_output_image, assignment_text # --- Gradio Interface --- logger.info("Setting up Gradio interface...") title = "🚗 Car Damage Detection" description = "1. Upload... 2. Classify... 3. Segment... 4. Assign... 5. Output..." # Shortened input_image = gr.Image(type="numpy", label="Upload Car Image") damage_threshold_slider = gr.Slider(minimum=0.05, maximum=0.95, step=0.05, value=DEFAULT_DAMAGE_PRED_THRESHOLD, label="Damage Confidence Threshold") part_threshold_slider = gr.Slider(minimum=0.05, maximum=0.95, step=0.05, value=DEFAULT_PART_PRED_THRESHOLD, label="Part Confidence Threshold") output_classification = gr.Textbox(label="1. Classification Result") output_probabilities = gr.Label(label="Classification Probabilities") output_image_display = gr.Image(type="numpy", label="3. Segmentation Visualization") output_assignment = gr.Textbox(label="2. Damage Assignments", lines=5, interactive=False) iface = gr.Interface( fn=predict_pipeline, inputs=[input_image, damage_threshold_slider, part_threshold_slider], outputs=[output_classification, output_probabilities, output_image_display, output_assignment], title=title, description=description, allow_flagging="never" ) if __name__ == "__main__": logger.info("Launching Gradio app...") iface.launch()