from flask import Flask, request, jsonify import cv2 import numpy as np import tensorflow as tf from transformers import BlipProcessor, BlipForConditionalGeneration, CLIPProcessor, CLIPModel import torch import os import requests from tempfile import NamedTemporaryFile import tensorflow_hub as hub # Ensure that Hugging Face uses the appropriate cache directory os.environ['TRANSFORMERS_CACHE'] = '/app/cache' os.environ['HF_HOME'] = '/app/cache' movenet_model_path = '/models/movenet/movenet_lightning' # Check if the model path exists if not os.path.exists(movenet_model_path): # Download the model from TensorFlow Hub movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4") else: movenet_model = tf.saved_model.load(movenet_model_path) # Load BLIP model blip_model = BlipForConditionalGeneration.from_pretrained('Salesforce/blip-image-captioning-large') blip_processor = BlipProcessor.from_pretrained('Salesforce/blip-image-captioning-large') # Load CLIP model clip_model = CLIPModel.from_pretrained('openai/clip-vit-large-patch14') clip_processor = CLIPProcessor.from_pretrained('openai/clip-vit-large-patch14') # Keypoint dictionary for reference KEYPOINT_DICT = { 'nose': 0, 'left_eye': 1, 'right_eye': 2, 'left_ear': 3, 'right_ear': 4, 'left_shoulder': 5, 'right_shoulder': 6, 'left_elbow': 7, 'right_elbow': 8, 'left_wrist': 9, 'right_wrist': 10, 'left_hip': 11, 'right_hip': 12, 'left_knee': 13, 'right_knee': 14, 'left_ankle': 15, 'right_ankle': 16 } app = Flask(__name__) @app.route('/process_video', methods=['POST']) def process_video(): try: # Get the video URL from the request video_url = request.json.get('videoURL') height = request.json.get('height') weight = request.json.get('weight') wingspan = request.json.get('wingspan') if not video_url: return jsonify({"error": "No video URL provided"}), 400 if not all([height, weight, wingspan]): return jsonify({"error": "Height, weight, and wingspan are required"}), 400 # Download the video from the S3 URL with NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file: response = requests.get(video_url) if response.status_code != 200: return jsonify({"error": "Failed to download video from the provided URL"}), 400 temp_video_file.write(response.content) video_path = temp_video_file.name # Open the video file cap = cv2.VideoCapture(video_path) frames = [] # Extract 60 frames from the video success, frame = cap.read() frame_count = 0 while success and frame_count < 60: frames.append(frame) success, frame = cap.read() frame_count += 1 cap.release() os.remove(video_path) # Process each frame with MoveNet (to get 3D keypoints and detect stance) movenet_results = [] stances = [] hip_rotations = [] arm_extensions = [] stepping_jabs = [] guard_up = [] hand_returned = [] hips_width_apart = [] leg_angle_correct = [] punch_started = False initial_left_wrist = None initial_right_wrist = None for frame_index, frame in enumerate(frames): input_tensor = tf.image.resize_with_pad(tf.convert_to_tensor(frame, dtype=tf.uint8), 256, 256) input_tensor = tf.cast(input_tensor, dtype=tf.int32) # Cast to int32 instead of float32 input_tensor = tf.expand_dims(input_tensor, axis=0) keypoints = movenet_model.signatures['serving_default'](input_tensor) keypoints_3d = keypoints['output_0'][0].numpy().tolist() # Assuming the model returns 3D keypoints movenet_results.append(keypoints_3d) # Detect stance based on keypoints (using ankles and wrists) left_ankle = keypoints_3d[KEYPOINT_DICT['left_ankle']] right_ankle = keypoints_3d[KEYPOINT_DICT['right_ankle']] left_wrist = keypoints_3d[KEYPOINT_DICT['left_wrist']] right_wrist = keypoints_3d[KEYPOINT_DICT['right_wrist']] if right_ankle[0] < left_ankle[0] and right_wrist[0] < left_wrist[0]: stance = "orthodox" elif left_ankle[0] < right_ankle[0] and left_wrist[0] < right_wrist[0]: stance = "southpaw" else: stance = "unknown" stances.append(stance) # Detect if guard is up (both hands near eye level at the side of the head) nose = keypoints_3d[KEYPOINT_DICT['nose']] guard_threshold = 0.1 # Threshold distance to consider hands near the head left_hand_near_head = abs(left_wrist[1] - nose[1]) < guard_threshold right_hand_near_head = abs(right_wrist[1] - nose[1]) < guard_threshold guard_up.append(left_hand_near_head and right_hand_near_head) # Determine if the punch has started (based on wrist movement) if frame_index > 0: previous_left_wrist = movenet_results[frame_index - 1][KEYPOINT_DICT['left_wrist']] previous_right_wrist = movenet_results[frame_index - 1][KEYPOINT_DICT['right_wrist']] if stance == "orthodox" and (left_wrist[0] - previous_left_wrist[0]) > 0.05: punch_started = True if initial_left_wrist is None: initial_left_wrist = left_wrist elif stance == "southpaw" and (right_wrist[0] - previous_right_wrist[0]) > 0.05: punch_started = True if initial_right_wrist is None: initial_right_wrist = right_wrist # Detect hip rotation (based on left and right hips, considering stance and punch start) left_hip = keypoints_3d[KEYPOINT_DICT['left_hip']] right_hip = keypoints_3d[KEYPOINT_DICT['right_hip']] if punch_started: if stance == "orthodox": hip_rotation = right_hip[0] - left_hip[0] # Right hip should move forward elif stance == "southpaw": hip_rotation = left_hip[0] - right_hip[0] # Left hip should move forward else: hip_rotation = 0 else: hip_rotation = 0 hip_rotations.append(hip_rotation) # Detect full arm extension (based on shoulder, elbow, and wrist, considering stance) left_shoulder = keypoints_3d[KEYPOINT_DICT['left_shoulder']] left_elbow = keypoints_3d[KEYPOINT_DICT['left_elbow']] right_shoulder = keypoints_3d[KEYPOINT_DICT['right_shoulder']] right_elbow = keypoints_3d[KEYPOINT_DICT['right_elbow']] if stance == "orthodox": lead_arm_extension = np.linalg.norm(np.array(left_wrist) - np.array(left_shoulder)) elif stance == "southpaw": lead_arm_extension = np.linalg.norm(np.array(right_wrist) - np.array(right_shoulder)) else: lead_arm_extension = 0 arm_extensions.append(lead_arm_extension) # Detect stepping with the jab and coming back (based on ankles, considering stance and punch start) if punch_started and frame_index > 0: previous_left_ankle = movenet_results[frame_index - 1][KEYPOINT_DICT['left_ankle']] previous_right_ankle = movenet_results[frame_index - 1][KEYPOINT_DICT['right_ankle']] if stance == "orthodox": step_movement = (left_ankle[0] - previous_left_ankle[0]) > 0.05 # Lead foot is left elif stance == "southpaw": step_movement = (right_ankle[0] - previous_right_ankle[0]) > 0.05 # Lead foot is right else: step_movement = False stepping_jabs.append(step_movement) else: stepping_jabs.append(False) # Detect if the hand returns to the initial position after the punch if punch_started: if stance == "orthodox" and initial_left_wrist is not None: hand_returned.append(np.linalg.norm(np.array(left_wrist) - np.array(initial_left_wrist)) < 0.05) elif stance == "southpaw" and initial_right_wrist is not None: hand_returned.append(np.linalg.norm(np.array(right_wrist) - np.array(initial_right_wrist)) < 0.05) else: hand_returned.append(False) else: hand_returned.append(False) # Detect if hips are shoulder width apart left_shoulder = keypoints_3d[KEYPOINT_DICT['left_shoulder']] right_shoulder = keypoints_3d[KEYPOINT_DICT['right_shoulder']] shoulder_width = abs(left_shoulder[0] - right_shoulder[0]) hips_width = abs(left_hip[0] - right_hip[0]) hips_width_apart.append(hips_width > 0.9 * shoulder_width and hips_width < 1.1 * shoulder_width) # Detect if the back leg is at a 45 degree angle outward (for orthodox and southpaw) if stance == "orthodox": right_leg_angle = np.arctan2(right_ankle[1] - right_hip[1], right_ankle[0] - right_hip[0]) * 180 / np.pi leg_angle_correct.append(40 <= right_leg_angle <= 50) elif stance == "southpaw": left_leg_angle = np.arctan2(left_ankle[1] - left_hip[1], left_ankle[0] - left_hip[0]) * 180 / np.pi leg_angle_correct.append(40 <= left_leg_angle <= 50) else: leg_angle_correct.append(False) # Generate captions for all 60 frames using BLIP captions = [] for frame in frames: inputs = blip_processor(images=frame, return_tensors="pt") with torch.no_grad(): caption = blip_model.generate(**inputs) captions.append(blip_processor.decode(caption[0], skip_special_tokens=True)) # Use CLIP to assess the similarity of frames to a Muay Thai jab prompt, including stance clip_results = [] for i, frame in enumerate(frames): stance = stances[i] prompt = f"A person performing a Muay Thai jab in {stance} stance at {height} in in height, {weight} lbs in weight, and a wingspan of {wingspan} cm, with hip rotation of {hip_rotations[i]:.2f}, arm extension of {arm_extensions[i]:.2f}, {'stepping forward' if stepping_jabs[i] else 'not stepping'}, {'guard up' if guard_up[i] else 'guard down'}, {'hand returned to initial position' if hand_returned[i] else 'hand not returned'}, {'hips shoulder width apart' if hips_width_apart[i] else 'hips not shoulder width apart'}, and {'correct leg angle' if leg_angle_correct[i] else 'incorrect leg angle'}" text_inputs = clip_processor(text=[prompt], return_tensors="pt") image_inputs = clip_processor(images=frame, return_tensors="pt") with torch.no_grad(): image_features = clip_model.get_image_features(**image_inputs) text_features = clip_model.get_text_features(**text_inputs) similarity = torch.nn.functional.cosine_similarity(image_features, text_features) clip_results.append(similarity.item()) # Calculate score based on CLIP results and BLIP captions avg_clip_similarity = sum(clip_results) / len(clip_results) if clip_results else 0 guard_score = sum(guard_up) / len(guard_up) if guard_up else 0 hand_return_score = sum(hand_returned) / len(hand_returned) if hand_returned else 0 hips_width_score = sum(hips_width_apart) / len(hips_width_apart) if hips_width_apart else 0 leg_angle_score = sum(leg_angle_correct) / len(leg_angle_correct) if leg_angle_correct else 0 overall_score = (avg_clip_similarity + guard_score + hand_return_score + hips_width_score + leg_angle_score) / 5 # Scale the overall score to a range of 0 - 10 overall_score = max(0, min(overall_score * 10, 10)) # Return combined results response = { "movenet_results": movenet_results, "blip_captions": captions, "clip_similarities": clip_results, "stances": stances, "hip_rotations": hip_rotations, "arm_extensions": arm_extensions, "stepping_jabs": stepping_jabs, "hips_width_apart": hips_width_apart, "leg_angle_correct": leg_angle_correct, "overall_score": overall_score, "guard_score": guard_score, "hand_return_score": hand_return_score, "hips_width_score":hips_width_score, "leg_angle_score": leg_angle_score, } return jsonify(response) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)