|
|
import os |
|
|
import gradio as gr |
|
|
from duckduckgo_search import DDGS |
|
|
import sqlite3 |
|
|
import json |
|
|
import requests |
|
|
from typing import List, Dict, Optional |
|
|
import time |
|
|
from bs4 import BeautifulSoup |
|
|
import urllib.parse |
|
|
|
|
|
|
|
|
|
|
|
specialtoken=os.getenv("SPECIALTOKEN")+"models/"+"openai-large" |
|
|
|
|
|
|
|
|
|
|
|
PROMPT_TEMPLATE = """Extract detailed information about a plant from any reliable source. Provide the information in a JSON object with the following keys: ["Name", "Scientific Name", "Alternate Names", "Description", "Plant Family", "Origin", "Growth Habitat", "Active Components", "Treatable Conditions", "Preparation Methods", "Dosage", "Duration", "Contraindications", "Side Effects", "Interactions", "Part Used", "Harvesting Time", "Storage Tips", "Images", "Related Videos", "Sources"]. |
|
|
|
|
|
Use only string values for each key. If any information is missing, set the value as an empty string (""). Format the output as a valid JSON object with proper syntax. |
|
|
|
|
|
Example: For the plant 'Name', provide the information accordingly. |
|
|
|
|
|
--- |
|
|
|
|
|
**Plant Name:** {plant_name} |
|
|
Additional source of information about that plant: '''{content}''' |
|
|
|
|
|
Please generate the JSON output for the above plant, ensuring it adheres to the specified keys and format. |
|
|
""" |
|
|
|
|
|
def fetch_page_content(url: str): |
|
|
"""Get webpage content with error handling""" |
|
|
try: |
|
|
response = requests.get(url, timeout=10) |
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
|
|
|
for element in soup(['script', 'style', 'header', 'footer', 'nav']): |
|
|
element.decompose() |
|
|
|
|
|
text = soup.get_text(separator='\n', strip=True) |
|
|
for each in ["Page not available","403 Forbidden"]: |
|
|
if each in text: |
|
|
return "No information found!" |
|
|
return text[:3500] |
|
|
except Exception as e: |
|
|
return f"Error fetching page: {str(e)}" |
|
|
|
|
|
def search_full_plant_information(plant_name:str): |
|
|
""" """ |
|
|
query = f"{plant_name} plant medicinal uses scientific information site:.edu OR site:.gov OR site:.org" |
|
|
search_results="" |
|
|
for attempt in range(3): |
|
|
try: |
|
|
search_results = DDGS().text(keywords=query, max_results=5) |
|
|
except Exception as e: |
|
|
if "Ratelimit" in str(e): |
|
|
try: |
|
|
wait_time = 2 ** attempt |
|
|
print(f"Rate limit hit, retrying in {wait_time} seconds...") |
|
|
time.sleep(wait_time) |
|
|
except Exception as e: |
|
|
pass |
|
|
|
|
|
if search_results: |
|
|
content="" |
|
|
for result in search_results: |
|
|
|
|
|
content+=result['body']+" " |
|
|
content+=fetch_page_content(result['href'])+" " |
|
|
time.sleep(2) |
|
|
prompt = PROMPT_TEMPLATE.format(plant_name=plant_name, content=content) |
|
|
|
|
|
prompt=urllib.parse.quote(prompt) |
|
|
response = requests.get(f"{specialtoken}/{prompt}") |
|
|
|
|
|
return response.text |
|
|
else: |
|
|
|
|
|
content="Get any information from any source about:"+plant_name |
|
|
prompt = PROMPT_TEMPLATE.format(plant_name=plant_name, content=content) |
|
|
|
|
|
prompt=urllib.parse.quote(prompt) |
|
|
response = requests.get(f"{specialtoken}/{prompt}") |
|
|
print(prompt) |
|
|
return response.text |
|
|
return f"No data Found for {plant_name}!" |
|
|
|
|
|
DB_NAME="plants.db" |
|
|
def save_to_db(plant_data: Dict) -> bool: |
|
|
"""Save processed plant data to database""" |
|
|
try: |
|
|
conn = sqlite3.connect(DB_NAME) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
for field in ["Alternate Names", "Active Components", "Treatable Conditions", |
|
|
"Preparation Methods", "Contraindications", "Side Effects", |
|
|
"Interactions"]: |
|
|
if field in plant_data: |
|
|
if isinstance(plant_data[field], list): |
|
|
plant_data[field] = ", ".join(plant_data[field]) |
|
|
elif not isinstance(plant_data[field], str): |
|
|
plant_data[field] = str(plant_data[field]) |
|
|
|
|
|
columns = [] |
|
|
values = [] |
|
|
for key, value in plant_data.items(): |
|
|
if key.lower() == "error": |
|
|
continue |
|
|
columns.append(key.lower().replace(" ", "_")) |
|
|
values.append(str(value) if value else None) |
|
|
|
|
|
columns_str = ", ".join(columns) |
|
|
placeholders = ", ".join(["?"] * len(columns)) |
|
|
|
|
|
cursor.execute( |
|
|
f"INSERT INTO plants ({columns_str}) VALUES ({placeholders})", |
|
|
values |
|
|
) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Database save error: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def process_plants(plants_array: List[str]) -> str: |
|
|
"""Main processing pipeline""" |
|
|
results = [] |
|
|
for plant in plants_array: |
|
|
plant = plant.strip() |
|
|
if not plant: |
|
|
continue |
|
|
|
|
|
print(f"Processing {plant}...") |
|
|
plant_data = search_full_plant_information(plant) |
|
|
|
|
|
if plant_data: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
return print(results) |
|
|
|
|
|
def split_and_search(text:str): |
|
|
all_data="" |
|
|
plants=text.split(",") |
|
|
for each in plants: |
|
|
sp=search_full_plant_information(each.strip()).replace('```json','').replace('```','').split('---')[0]+",\n" |
|
|
|
|
|
if len(sp) < 10: |
|
|
emptydata="{"+f''' |
|
|
"Name": "No data for {plant_name}", |
|
|
"Scientific Name": "", |
|
|
"Alternate Names": "", |
|
|
"Description": "", |
|
|
"Plant Family": "", |
|
|
"Origin": "", |
|
|
"Growth Habitat": "", |
|
|
"Active Components": "", |
|
|
"Treatable Conditions": "", |
|
|
"Preparation Methods": "", |
|
|
"Dosage": "", |
|
|
"Duration": "", |
|
|
"Contraindications": "", |
|
|
"Side Effects": "", |
|
|
"Interactions": "", |
|
|
"Part Used": "", |
|
|
"Harvesting Time": "", |
|
|
"Storage Tips": "", |
|
|
"Images": "", |
|
|
"Related Videos": "", |
|
|
"Sources": ""'''+"}" |
|
|
sp=emptydata |
|
|
all_data+=sp |
|
|
time.sleep(1) |
|
|
return all_data |
|
|
|
|
|
|
|
|
def get_all_plants() -> List[Dict]: |
|
|
"""Retrieve all plants from database""" |
|
|
try: |
|
|
conn = sqlite3.connect(DB_NAME) |
|
|
conn.row_factory = sqlite3.Row |
|
|
cursor = conn.cursor() |
|
|
cursor.execute("SELECT `_rowid_`,* FROM plants ORDER BY `_rowid_` DESC") |
|
|
plants = [dict(row) for row in cursor.fetchall()] |
|
|
conn.close() |
|
|
return plants |
|
|
except Exception as e: |
|
|
print(f"Database retrieval error: {e}") |
|
|
return [{"Error": "Failed to retrieve data from database"}] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="AI-Powered Medicinal Plants Database") as app: |
|
|
gr.Markdown("# ๐ฟ AI-Powered Medicinal Plants Database") |
|
|
with gr.Tab("Fetch & Process Plants"): |
|
|
gr.Markdown("### Enter plant names (comma separated)") |
|
|
with gr.Row(): |
|
|
plant_input = gr.Textbox(label="Plant Names", |
|
|
placeholder="e.g., Neem, Peppermint, Aloe Vera") |
|
|
fetch_btn = gr.Button("Process Plants", variant="primary") |
|
|
|
|
|
output_area = gr.Textbox(label="AI-Processed Results", lines=12, interactive=False) |
|
|
|
|
|
|
|
|
fetch_btn.click( |
|
|
fn=split_and_search, |
|
|
|
|
|
inputs=plant_input, |
|
|
outputs=output_area |
|
|
) |
|
|
|
|
|
with gr.Tab("View Database"): |
|
|
gr.Markdown("### Stored Plant Information") |
|
|
with gr.Row(): |
|
|
refresh_btn = gr.Button("Refresh Data", variant="secondary") |
|
|
clear_db = gr.Button("Clear Database", variant="stop") |
|
|
|
|
|
db_table = gr.Dataframe( |
|
|
headers=["id", "name", "scientific_name", "description"], |
|
|
datatype=["number", "str", "str", "str"], |
|
|
col_count=(4, "fixed"), |
|
|
interactive=True |
|
|
) |
|
|
|
|
|
refresh_btn.click( |
|
|
fn=get_all_plants, |
|
|
outputs=db_table |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.launch(debug=True, share=False) |