import gradio as gr from transformers import AutoModelForCausalLM, AutoTokenizer from collections import defaultdict, OrderedDict import re import torch from threading import Lock # Configuración inicial model_name = "microsoft/DialoGPT-small" device = "cuda" if torch.cuda.is_available() else "cpu" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name).to(device) model.eval() # Cargar alojamientos en memoria con preprocesamiento def cargar_alojamientos(): with open("alojamientos.txt", "r", encoding="utf-8") as file: alojamientos = file.read().split("\n\n") return {idx: alojamiento for idx, alojamiento in enumerate(alojamientos)} alojamientos_info = cargar_alojamientos() # Índice invertido optimizado indice_palabras = defaultdict(set) for idx, alojamiento in alojamientos_info.items(): for palabra in re.split(r'\W+', alojamiento.lower()): if len(palabra) > 2: indice_palabras[palabra].add(idx) # Caché LRU con límite de tamaño class LRUCache: def __init__(self, capacity=100): self.cache = OrderedDict() self.capacity = capacity self.lock = Lock() def get(self, key): with self.lock: if key not in self.cache: return None self.cache.move_to_end(key) return self.cache[key] def put(self, key, value): with self.lock: if key in self.cache: self.cache.move_to_end(key) self.cache[key] = value if len(self.cache) > self.capacity: self.cache.popitem(last=False) cache_respuestas = LRUCache() cache_paginas = LRUCache() # Sinónimos de búsqueda sinonimos = ["alojamiento", "alquiler", "hospedaje", "residencia", "vivienda"] # Función de búsqueda optimizada def buscar_alojamiento(consulta): consulta = consulta.lower() cached = cache_respuestas.get(consulta) if cached is not None: return cached palabras = set(re.split(r'\W+', consulta)) indices = set() for palabra in palabras: if palabra in indice_palabras or palabra in sinonimos: indices.update(indice_palabras[palabra]) resultados = [alojamientos_info[idx] for idx in indices] resultados.sort() # Ordenar para mantener consistencia cache_respuestas.put(consulta, resultados) cache_paginas.put(consulta, 0) return resultados # Formateo visual optimizado def formatear_alojamiento(texto): bloques = texto.split("\n\n") resultado = [] nombre_alojamiento = "" datos_contacto = [] tipos_alojamiento = [] for bloque in bloques: lineas = bloque.split("\n") for linea in lineas: if "Alojamiento:" in linea or "Alquiler:" in linea: nombre_alojamiento = linea # Guardamos el nombre del alojamiento elif any(tag in linea for tag in ["Dirección:", "Teléfono:", "Email:", "Mascotas:", "Wifi:", "Directv:", "Ropa blanca:", "Habilitación provincial:"]): datos_contacto.append(linea) elif "Plazas:" in linea or any(tag in linea for tag in ["Descripción:", "Servicios:"]): tipos_alojamiento.append(linea) if nombre_alojamiento: resultado.append(nombre_alojamiento) if datos_contacto: resultado.append("\n".join(datos_contacto)) if tipos_alojamiento: resultado.append("Tipos de alojamiento:\n" + "\n".join(tipos_alojamiento)) return "\n\n".join(resultado) # Paginación y resultados def mostrar_resultados(consulta): resultados = buscar_alojamiento(consulta) if not resultados: return "Lo siento, no encontré información exacta. Intenta preguntar de otra manera.", "" pagina = cache_paginas.get(consulta) or 0 inicio, fin = pagina * 3, (pagina + 1) * 3 resultados_pagina = resultados[inicio:fin] respuesta = "\n\n".join(formatear_alojamiento(r) for r in resultados_pagina) pregunta_mas = "" if fin < len(resultados): cache_paginas.put(consulta, pagina + 1) pregunta_mas = "¿Quieres ver más resultados? Escribe 'sí' para continuar." elif pagina > 0: pregunta_mas = "¿Quieres ver resultados anteriores? Escribe 'atrás' para volver." return respuesta, pregunta_mas # Interfaz con Gradio with gr.Blocks(title="Chat de Turismo") as iface: gr.Markdown("### Asistente de Turismo - Alojamientos") output_box = gr.Textbox(label="Historial", lines=15, interactive=False) input_box = gr.Textbox(label="Consulta", placeholder="Escribe aquí y presiona Enter...") extra_box = gr.Textbox(label="Opciones", interactive=False) send_button = gr.Button("Enviar") send_button.click(mostrar_resultados, inputs=input_box, outputs=[output_box, extra_box]) input_box.submit(mostrar_resultados, inputs=input_box, outputs=[output_box, extra_box]) iface.launch(share=True, inbrowser=True)