Spaces:
Running
Running
| from fastapi import FastAPI,Request,File,UploadFile,Form | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import HTMLResponse,JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import pandas as pd | |
| import re | |
| import io | |
| import base64 | |
| import matplotlib.pyplot as plt | |
| import torch | |
| import tensorflow as tf | |
| import fitz | |
| from docx import Document | |
| from pptx import Presentation | |
| import seaborn as sns | |
| import PIL.Image as Image | |
| import fitz | |
| from huggingface_hub import snapshot_download | |
| from transformers import ( | |
| AutoTokenizer, AutoModelForSeq2SeqLM, | |
| AutoModelForCausalLM,pipeline | |
| ) | |
| try: | |
| print("[Info] installing Salesforce/blip-image-captioning-base ....") | |
| blip_dir = "./models/blip-base-tf" | |
| snapshot_download("Salesforce/blip-image-captioning-base", local_dir=blip_dir, local_dir_use_symlinks=False) | |
| interpreter = pipeline("image-to-text", model="Salesforce/blip-image-captioning-base") | |
| print("[Info] Salesforce/blip-image-captioning-base is installed.") | |
| except Exception as exp: | |
| print("Can't load the model Salesforce/blip-image-captioning-base") | |
| print(f"[Error] {str(exp)}") | |
| try: | |
| print("[Info] installing facebook/bart-large-cnn ....") | |
| bart_dir = "./models/bart-large-cnn" | |
| snapshot_download("facebook/bart-large-cnn", local_dir=bart_dir, local_dir_use_symlinks=False) | |
| bart_tokenizer = AutoTokenizer.from_pretrained(bart_dir) | |
| bart_model = AutoModelForSeq2SeqLM.from_pretrained(bart_dir) | |
| summarizer = pipeline("summarization", model=bart_model, tokenizer=bart_tokenizer) | |
| print("[Info] facebook/bart-large-cnn is installed") | |
| except Exception as exp: | |
| print("Can't load the model facebook/bart-large-cnn") | |
| print(f"[Error] {str(exp)}") | |
| try: | |
| print("[Info] installing deepseek-ai/deepseek-coder-1.3b-instruct ") | |
| deepseek_dir = "./models/deepseek-coder" | |
| snapshot_download("deepseek-ai/deepseek-coder-1.3b-instruct", local_dir=deepseek_dir, local_dir_use_symlinks=False) | |
| deepseek_tokenizer = AutoTokenizer.from_pretrained(deepseek_dir, trust_remote_code=True) | |
| deepseek_model = AutoModelForCausalLM.from_pretrained(deepseek_dir, trust_remote_code=True) | |
| generator = pipeline("text-generation", model=deepseek_model, tokenizer=deepseek_tokenizer) | |
| print("[Info] deepseek-ai/deepseek-coder-1.3b-instruct is installed") | |
| except Exception as exp: | |
| print("Can't load the model deepseek-ai/deepseek-coder-1.3b-instruct") | |
| print(f"[Error] {str(exp)}") | |
| app=FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| MAX_SIZE= 1 * 1024 *1024 | |
| app.mount("/static",StaticFiles(directory='static'),'static') | |
| templates = Jinja2Templates(directory='templates') | |
| def index(req:Request): | |
| return templates.TemplateResponse('index.html',{'request':req}) | |
| def index(req:Request): | |
| return templates.TemplateResponse('text-summarization.html',{'request':req}) | |
| def index(req:Request): | |
| return templates.TemplateResponse('data-visualization.html',{'request':req}) | |
| def index(req:Request): | |
| return templates.TemplateResponse('image-interpretation.html',{'request':req}) | |
| def interpret(file_img:UploadFile=File(...)): | |
| extension = file_img.filename.split(".")[-1] | |
| Supported_extensions = ["png","jpg","jpeg"] | |
| if extension not in Supported_extensions: | |
| return JSONResponse(content={"error": "Unsupported file type"},status_code=400) | |
| image = Image.open(file_img.file) | |
| global interpreter | |
| try: | |
| caption = interpreter(image) | |
| except Exception as exp: | |
| return JSONResponse(content={"error": "Can't interpret the image "},status_code=400) | |
| return JSONResponse(content={"caption": caption[0]['generated_text']},status_code=200) | |
| def summerzation(file:UploadFile=File(...)): | |
| try: | |
| extension = file.filename.split(".")[-1] | |
| supported_ext=["pdf","xlsx","docx","ppt","xls"] | |
| if extension not in supported_ext : | |
| return JSONResponse(content={"error": "Unsupported file type"},status_code=400) | |
| if file.size > MAX_SIZE: | |
| return JSONResponse(content={"error": "file is too large "},status_code=400) | |
| file_bytes = file.file.read() | |
| if extension == "pdf": | |
| text = get_text_from_PDF(file_bytes) | |
| elif extension == "docx": | |
| text = get_text_from_DOC(file_bytes) | |
| elif extension == "pptx": | |
| text = get_text_from_PPT(file_bytes) | |
| elif extension == "xlsx": | |
| text = get_text_from_EXCEL(file_bytes) | |
| if not text.strip(): | |
| return JSONResponse(content={'error':'File is emplty'},status_code=400) | |
| result="" | |
| global summarizer | |
| for i in range(0, len(text), 1024): | |
| try: | |
| summary = summarizer(text[i:i+1024], max_length=150, min_length=30, do_sample=False) | |
| result += summary[0]['summary_text'] | |
| except Exception as e: | |
| return JSONResponse(content={"error": f"Summarization failed: {str(e)}"},status_code=403) | |
| return JSONResponse(content={"summary": result},status_code=200) | |
| except Exception as exp: | |
| return JSONResponse(content={"error":"Internel Server Error:"+str(exp)} ,status_code=500) | |
| async def plot(user_need:str=Form(...),file:UploadFile=File(...)): | |
| try: | |
| extension = file.filename.split(".")[-1] | |
| Supported_extensions = ["xlsx","xls"] | |
| if extension not in Supported_extensions: | |
| return JSONResponse(content={"error": "Unsupported file type"},status_code=400) | |
| file_bytes = file.file.read() | |
| if len(file_bytes) > MAX_SIZE : | |
| return JSONResponse(content={"error": "too large file "},status_code=400) | |
| df = pd.read_excel(io=io.BytesIO(file_bytes)) | |
| prompt = [ | |
| {"role": "system", "content": f'''You are a helpful assistant that helps users write Python code. | |
| -you have to write the code to solve the task using the dataset df. | |
| -you can use pandas to manipulate the dataframe. | |
| -you can use matplotlib to plot the data. | |
| -you can use seaborn to plot the data. | |
| -don't use print or input statements in the code. | |
| -don't use any other libraries except pandas, matplotlib, seaborn. | |
| -don't use any other functions except the ones provided in the libraries. | |
| -don't write the code for the dataframe creation. | |
| -check if the columns has a nan values and raise exception if yes . | |
| -exclude plt.show() from the code. | |
| -you have to write the code in a markdown code block. | |
| -make sure that the type of the chart is compatible with the dtypes of the columns | |
| -use only the column specified in the task. | |
| -you have an information about the dataframe called df contains the following information: | |
| df.columns:{df.columns.to_list()} | |
| df.dtypes:{df.dtypes.to_dict()} | |
| -you have to extract the column names and the plot type from the prompt bellow and use them in the code. | |
| -if the user task is not clear or there is an error like the column names are not in the dataframe, raise an | |
| error. | |
| '''}, | |
| {"role": "user", "content": user_need}, | |
| ] | |
| global generator | |
| output = generator(prompt, max_length=1000) | |
| match = re.search(r'```python(.*?)```', output[0]['generated_text'][2]['content'], re.DOTALL) | |
| code ='' | |
| if not match: | |
| return JSONResponse(content={"error": "No Code was Generated"},status_code=403) | |
| code = match.group(1).replace("plt.show()\n","") | |
| safe_globals={ | |
| "plt": plt, | |
| "sns": sns, | |
| "pd": pd, | |
| "df": df | |
| } | |
| try: | |
| exec(code,safe_globals) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png') | |
| buf.seek(0) | |
| base64_image = base64.b64encode(buf.getvalue()).decode('utf-8') | |
| return JSONResponse(content={"plot": f"data:image/png;base64,{base64_image}",'code':code},status_code=200) | |
| except Exception as e: | |
| print(e) | |
| return JSONResponse(content={"error":"Can't execute the code : "+ str(e) },status_code=400) | |
| except Exception as exp: | |
| print(exp) | |
| return JSONResponse(content={"error":"Internel Server Error: "+str(exp)} ,status_code=500) | |
| def get_text_from_PDF(file_content): | |
| doc = fitz.open(stream=file_content, filetype="pdf") | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| return text | |
| def get_text_from_PPT(file_content): | |
| prs = Presentation(io.BytesIO(file_content)) | |
| text = "" | |
| for slide in prs.slides: | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| text += shape.text | |
| return text | |
| def get_text_from_DOC(file_content): | |
| doc = Document(io.BytesIO(file_content)) | |
| text = "" | |
| for paragraph in doc.paragraphs: | |
| text += paragraph.text | |
| return text | |
| def get_text_from_EXCEL(file_content): | |
| df = pd.read_excel(io=io.BytesIO(file_content)) | |
| text = df.to_string() | |
| return text |