Spaces:
Runtime error
Runtime error
| from typing import Any, Callable, Dict, List, Optional, Union | |
| import numpy as np | |
| import torch | |
| from diffusers import StableDiffusionPipeline | |
| from diffusers.models import AutoencoderKL | |
| from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput, StableDiffusionSafetyChecker | |
| from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion import rescale_noise_cfg | |
| from diffusers.schedulers import KarrasDiffusionSchedulers | |
| from tqdm import tqdm | |
| from transformers import CLIPTextModel, CLIPTokenizer, CLIPImageProcessor | |
| from config import Range | |
| from models.unet_2d_condition import FreeUUNet2DConditionModel | |
| class CrossImageAttentionStableDiffusionPipeline(StableDiffusionPipeline): | |
| """ A modification of the standard StableDiffusionPipeline to incorporate our cross-image attention.""" | |
| def __init__(self, vae: AutoencoderKL, | |
| text_encoder: CLIPTextModel, | |
| tokenizer: CLIPTokenizer, | |
| unet: FreeUUNet2DConditionModel, | |
| scheduler: KarrasDiffusionSchedulers, | |
| safety_checker: StableDiffusionSafetyChecker, | |
| feature_extractor: CLIPImageProcessor, | |
| requires_safety_checker: bool = True): | |
| super().__init__( | |
| vae, text_encoder, tokenizer, unet, scheduler, safety_checker, feature_extractor, requires_safety_checker | |
| ) | |
| def __call__( | |
| self, | |
| prompt: Union[str, List[str]] = None, | |
| height: Optional[int] = None, | |
| width: Optional[int] = None, | |
| num_inference_steps: int = 50, | |
| guidance_scale: float = 7.5, | |
| negative_prompt: Optional[Union[str, List[str]]] = None, | |
| num_images_per_prompt: Optional[int] = 1, | |
| eta: float = 0.0, | |
| generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, | |
| latents: Optional[torch.FloatTensor] = None, | |
| prompt_embeds: Optional[torch.FloatTensor] = None, | |
| negative_prompt_embeds: Optional[torch.FloatTensor] = None, | |
| output_type: Optional[str] = "pil", | |
| return_dict: bool = True, | |
| callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
| callback_steps: int = 1, | |
| cross_attention_kwargs: Optional[Dict[str, Any]] = None, | |
| guidance_rescale: float = 0.0, | |
| swap_guidance_scale: float = 1.0, | |
| cross_image_attention_range: Range = Range(10, 90), | |
| # DDPM addition | |
| zs: Optional[List[torch.Tensor]] = None | |
| ): | |
| # 0. Default height and width to unet | |
| height = height or self.unet.config.sample_size * self.vae_scale_factor | |
| width = width or self.unet.config.sample_size * self.vae_scale_factor | |
| # 1. Check inputs. Raise error if not correct | |
| self.check_inputs( | |
| prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds | |
| ) | |
| # 2. Define call parameters | |
| if prompt is not None and isinstance(prompt, str): | |
| batch_size = 1 | |
| elif prompt is not None and isinstance(prompt, list): | |
| batch_size = len(prompt) | |
| else: | |
| batch_size = prompt_embeds.shape[0] | |
| device = self._execution_device | |
| # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) | |
| # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` | |
| # corresponds to doing no classifier free guidance. | |
| do_classifier_free_guidance = guidance_scale > 1.0 | |
| # 3. Encode input prompt | |
| text_encoder_lora_scale = ( | |
| cross_attention_kwargs.get("scale", None) if cross_attention_kwargs is not None else None | |
| ) | |
| prompt_embeds = self._encode_prompt( | |
| prompt, | |
| device, | |
| num_images_per_prompt, | |
| do_classifier_free_guidance, | |
| negative_prompt, | |
| prompt_embeds=prompt_embeds, | |
| negative_prompt_embeds=negative_prompt_embeds, | |
| lora_scale=text_encoder_lora_scale, | |
| ) | |
| # 4. Prepare timesteps | |
| self.scheduler.set_timesteps(num_inference_steps, device=device) | |
| timesteps = self.scheduler.timesteps | |
| t_to_idx = {int(v): k for k, v in enumerate(timesteps[-zs[0].shape[0]:])} | |
| timesteps = timesteps[-zs[0].shape[0]:] | |
| # 5. Prepare latent variables | |
| num_channels_latents = self.unet.config.in_channels | |
| latents = self.prepare_latents( | |
| batch_size * num_images_per_prompt, | |
| num_channels_latents, | |
| height, | |
| width, | |
| prompt_embeds.dtype, | |
| device, | |
| generator, | |
| latents, | |
| ) | |
| # 7. Denoising loop | |
| num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order | |
| op = tqdm(timesteps[-zs[0].shape[0]:]) | |
| n_timesteps = len(timesteps[-zs[0].shape[0]:]) | |
| count = 0 | |
| for t in op: | |
| i = t_to_idx[int(t)] | |
| # expand the latents if we are doing classifier free guidance | |
| latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents | |
| latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) | |
| noise_pred_swap = self.unet( | |
| latent_model_input, | |
| t, | |
| encoder_hidden_states=prompt_embeds, | |
| cross_attention_kwargs={'perform_swap': True}, | |
| return_dict=False, | |
| )[0] | |
| noise_pred_no_swap = self.unet( | |
| latent_model_input, | |
| t, | |
| encoder_hidden_states=prompt_embeds, | |
| cross_attention_kwargs={'perform_swap': False}, | |
| return_dict=False, | |
| )[0] | |
| # perform guidance | |
| if do_classifier_free_guidance: | |
| _, noise_swap_pred_text = noise_pred_swap.chunk(2) | |
| noise_no_swap_pred_uncond, _ = noise_pred_no_swap.chunk(2) | |
| noise_pred = noise_no_swap_pred_uncond + guidance_scale * ( | |
| noise_swap_pred_text - noise_no_swap_pred_uncond) | |
| else: | |
| is_cross_image_step = cross_image_attention_range.start <= i <= cross_image_attention_range.end | |
| if swap_guidance_scale > 1.0 and is_cross_image_step: | |
| swapping_strengths = np.linspace(swap_guidance_scale, | |
| max(swap_guidance_scale / 2, 1.0), | |
| n_timesteps) | |
| swapping_strength = swapping_strengths[count] | |
| noise_pred = noise_pred_no_swap + swapping_strength * (noise_pred_swap - noise_pred_no_swap) | |
| noise_pred = rescale_noise_cfg(noise_pred, noise_pred_swap, guidance_rescale=guidance_rescale) | |
| else: | |
| noise_pred = noise_pred_swap | |
| latents = torch.stack([ | |
| self.perform_ddpm_step(t_to_idx, zs[latent_idx], latents[latent_idx], t, noise_pred[latent_idx], eta) | |
| for latent_idx in range(latents.shape[0]) | |
| ]) | |
| # call the callback, if provided | |
| if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): | |
| # progress_bar.update() | |
| if callback is not None and i % callback_steps == 0: | |
| callback(i, t, latents) | |
| count += 1 | |
| if not output_type == "latent": | |
| image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False)[0] | |
| image, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype) | |
| else: | |
| image = latents | |
| has_nsfw_concept = None | |
| if has_nsfw_concept is None: | |
| do_denormalize = [True] * image.shape[0] | |
| else: | |
| do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] | |
| image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) | |
| # Offload last model to CPU | |
| if hasattr(self, "final_offload_hook") and self.final_offload_hook is not None: | |
| self.final_offload_hook.offload() | |
| if not return_dict: | |
| return (image, has_nsfw_concept) | |
| return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) | |
| def perform_ddpm_step(self, t_to_idx, zs, latents, t, noise_pred, eta): | |
| idx = t_to_idx[int(t)] | |
| z = zs[idx] if not zs is None else None | |
| # 1. get previous step value (=t-1) | |
| prev_timestep = t - self.scheduler.config.num_train_timesteps // self.scheduler.num_inference_steps | |
| # 2. compute alphas, betas | |
| alpha_prod_t = self.scheduler.alphas_cumprod[t] | |
| alpha_prod_t_prev = self.scheduler.alphas_cumprod[ | |
| prev_timestep] if prev_timestep >= 0 else self.scheduler.final_alpha_cumprod | |
| beta_prod_t = 1 - alpha_prod_t | |
| # 3. compute predicted original sample from predicted noise also called | |
| # "predicted x_0" of formula (12) from https://arxiv.org/pdf/2010.02502.pdf | |
| pred_original_sample = (latents - beta_prod_t ** (0.5) * noise_pred) / alpha_prod_t ** (0.5) | |
| # 5. compute variance: "sigma_t(Ξ·)" -> see formula (16) | |
| # Ο_t = sqrt((1 β Ξ±_tβ1)/(1 β Ξ±_t)) * sqrt(1 β Ξ±_t/Ξ±_tβ1) | |
| # variance = self.scheduler._get_variance(timestep, prev_timestep) | |
| variance = self.get_variance(t) | |
| std_dev_t = eta * variance ** (0.5) | |
| # Take care of asymetric reverse process (asyrp) | |
| model_output_direction = noise_pred | |
| # 6. compute "direction pointing to x_t" of formula (12) from https://arxiv.org/pdf/2010.02502.pdf | |
| # pred_sample_direction = (1 - alpha_prod_t_prev - std_dev_t**2) ** (0.5) * model_output_direction | |
| pred_sample_direction = (1 - alpha_prod_t_prev - eta * variance) ** (0.5) * model_output_direction | |
| # 7. compute x_t without "random noise" of formula (12) from https://arxiv.org/pdf/2010.02502.pdf | |
| prev_sample = alpha_prod_t_prev ** (0.5) * pred_original_sample + pred_sample_direction | |
| # 8. Add noice if eta > 0 | |
| if eta > 0: | |
| if z is None: | |
| z = torch.randn(noise_pred.shape, device=self.device) | |
| sigma_z = eta * variance ** (0.5) * z | |
| prev_sample = prev_sample + sigma_z | |
| return prev_sample | |
| def get_variance(self, timestep): | |
| prev_timestep = timestep - self.scheduler.config.num_train_timesteps // self.scheduler.num_inference_steps | |
| alpha_prod_t = self.scheduler.alphas_cumprod[timestep] | |
| alpha_prod_t_prev = self.scheduler.alphas_cumprod[ | |
| prev_timestep] if prev_timestep >= 0 else self.scheduler.final_alpha_cumprod | |
| beta_prod_t = 1 - alpha_prod_t | |
| beta_prod_t_prev = 1 - alpha_prod_t_prev | |
| variance = (beta_prod_t_prev / beta_prod_t) * (1 - alpha_prod_t / alpha_prod_t_prev) | |
| return variance | |