Spaces:
Running
Running
| from flask import Flask, render_template, Response, request, jsonify | |
| from flask_socketio import SocketIO | |
| from flask_cors import CORS | |
| import mediapipe as mp | |
| import cv2 | |
| import numpy as np | |
| import time | |
| import os | |
| app = Flask(__name__) | |
| socketio = SocketIO(app) | |
| CORS(app) | |
| mp_pose = mp.solutions.pose | |
| pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) | |
| previous_keypoints = None | |
| previous_velocities = None | |
| previous_time = time.time() | |
| def index(): | |
| return render_template('index.html') | |
| def web_app(): | |
| return render_template('holistic.html') | |
| def video_app(): | |
| return render_template('video_app.html') | |
| def process_frame(frame): | |
| global previous_keypoints, previous_time, previous_velocities | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = pose.process(frame_rgb) | |
| if results.pose_landmarks: | |
| landmarks = results.pose_landmarks.landmark | |
| keypoints = [(lm.x, lm.y, lm.z) for lm in landmarks] | |
| current_time = time.time() | |
| delta_time = current_time - previous_time | |
| velocities = [] | |
| accelerations = [] | |
| if previous_keypoints: | |
| for i, lm in enumerate(landmarks): | |
| dx = lm.x - previous_keypoints[i][0] | |
| dy = lm.y - previous_keypoints[i][1] | |
| dz = lm.z - previous_keypoints[i][2] | |
| speed = np.sqrt(dx ** 2 + dy ** 2 + dz ** 2) / delta_time | |
| velocities.append(speed) | |
| acceleration = (speed - previous_velocities[i]) / delta_time if previous_velocities else 0 | |
| accelerations.append(acceleration) | |
| previous_keypoints = keypoints | |
| previous_velocities = velocities | |
| previous_time = current_time | |
| return { | |
| 'landmarks': keypoints, | |
| 'velocities': velocities, | |
| 'accelerations': accelerations | |
| } | |
| return None | |
| def handle_process_frame(data): | |
| frame = cv2.imdecode(np.frombuffer(data['frame'], np.uint8), cv2.IMREAD_COLOR) | |
| result = process_frame(frame) | |
| if result: | |
| socketio.emit('pose_data', result) | |
| def upload_video(): | |
| file = request.files.get('video') | |
| if file: | |
| upload_folder = os.path.join(app.root_path, 'static', 'uploads') | |
| if not os.path.exists(upload_folder): | |
| os.makedirs(upload_folder) | |
| video_path = os.path.join(upload_folder, 'temp_video.mp4') | |
| file.save(video_path) | |
| return jsonify(success=True, message='Video uploaded successfully') | |
| return jsonify(success=False, message='No video file received'), 400 | |
| def video_feed(): | |
| def generate_frames(): | |
| video_path = os.path.join(app.root_path, 'static', 'uploads', 'temp_video.mp4') | |
| if not os.path.exists(video_path): | |
| return | |
| cap = cv2.VideoCapture(video_path) | |
| while True: | |
| success, frame = cap.read() | |
| if not success: | |
| break | |
| else: | |
| result = process_frame(frame) | |
| if result: | |
| socketio.emit('pose_data', result) | |
| ret, buffer = cv2.imencode('.jpg', frame) | |
| frame = buffer.tobytes() | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
| cap.release() | |
| return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') | |
| if __name__ == '__main__': | |
| os.makedirs(os.path.join(app.root_path, 'static', 'uploads'), exist_ok=True) | |
| socketio.run(app, host='0.0.0.0', port=7860, debug=True, allow_unsafe_werkzeug=True) |