Spaces:
Running
Running
import cv2 | |
from mtcnn.mtcnn import MTCNN | |
from utils import * | |
cloth_examples = get_cloth_examples(hr=0) | |
cloth_hr_examples = get_cloth_examples(hr=1) | |
pose_examples = get_pose_examples() | |
tip1, tip2 = get_tips() | |
face_detector = MTCNN() | |
# Description | |
title = r""" | |
<h1 align="center">Outfit Anyway: Best customer try-on You ever See</h1> | |
""" | |
description = r""" | |
<b>Join discord to know more about </b> <a href='https://discord.com/invite/QgJWCtSG58' target='_blank'><b> heybeauty prebuy vton solution</b></a>.<br> | |
""" | |
def onClick(cloth_image, pose_image, high_resolution, request: gr.Request): | |
if pose_image is None: | |
yield None, "no pose image found !", "" | |
return None, "no pose image found !", "" | |
if cloth_image is None: | |
yield None, "no cloth image found !", "" | |
return None, "no cloth image found !", "" | |
pose_id = os.path.basename(pose_image).split(".")[0] | |
cloth_id = int(os.path.basename(cloth_image).split(".")[0]) | |
try: | |
client_ip = request.client.host | |
x_forwarded_for = dict(request.headers).get('x-forwarded-for') | |
if x_forwarded_for: | |
client_ip = x_forwarded_for | |
pose_np = cv2.imread(pose_image) | |
faces = face_detector.detect_faces(pose_np[:,:,::-1]) | |
if len(faces)==0: | |
print(client_ip, 'faces num is 0! ', flush=True) | |
yield None, "Fatal Error !!! No face detected !!! You must upload a human photo!!! Not clothing photo!!!", "" | |
return None, "Fatal Error !!! No face detected !!! You must upload a human photo!!! Not clothing photo!!!", "" | |
else: | |
x, y, w, h = faces[0]["box"] | |
H, W = pose_np.shape[:2] | |
max_face_ratio = 1/3.3 | |
if w/W>max_face_ratio or h/H>max_face_ratio: | |
yield None, "Fatal Error !!! Headshot is not allowed !!! You must upload a full-body or half-body photo!!!", "" | |
return None, "Fatal Error !!! Headshot is not allowed !!! You must upload a full-body or half-body photo!!!", "" | |
if not check_region_warp(client_ip): | |
yield None, "Failed !!! Our server is under maintenance, please try again later", "" | |
return None, "Failed !!! Our server is under maintenance, please try again later", "" | |
# client_ip = '8.8.8.8' | |
yield None, "begin to upload ", "" | |
timeId = int( str(time.time()).replace(".", "") )+random.randint(1000, 9999) | |
upload_url = upload_pose_img(client_ip, timeId, pose_image) | |
# exit(0) | |
yield None, "begin to public task ", "" | |
# return None, "begin to public task ", "" | |
if len(upload_url)==0: | |
yield None, "fail to upload", "" | |
return None, "fail to upload", "" | |
if high_resolution: | |
public_res = publicClothSwap(upload_url, cloth_id, is_hr=1) | |
else: | |
public_res = publicClothSwap(upload_url, cloth_id, is_hr=0) | |
if public_res is None: | |
yield None, "fail to public you task", "" | |
return None, "fail to public you task", "" | |
print(client_ip, public_res['mid_result']) | |
yield public_res['mid_result'], f"task is processing, task id: {public_res['id']}, {public_res['msg']}", "" | |
max_try = 120*3 | |
wait_s = 0.5 | |
for i in range(max_try): | |
time.sleep(wait_s) | |
state = getInfRes(public_res['id']) | |
timestamp = int(time.time() * 1000) | |
if state is None: | |
yield public_res['mid_result'] + f"?t={timestamp}", "task query failed,", "" | |
elif state['status']=='PROCESSING': | |
yield public_res['mid_result'] + f"?t={timestamp}", f"task is processing, query {i}", "" | |
elif state['status']=='SUCCEED': | |
yield state['output1'] + f"?t={timestamp}", f"task finished, {state['msg']}", "" | |
return state['output1'] + f"?t={timestamp}", f"task finished, {state['msg']}", "" | |
elif state['status']=='FAILED': | |
yield None, f"task failed, {state['msg']}", "" | |
return None, f"task failed, {state['msg']}", "" | |
else: | |
yield public_res['mid_result'] + f"?t={timestamp}", f"task is on processing, query {i}", "" | |
return None, "no machine...", "" | |
except Exception as e: | |
print(e) | |
raise e | |
return None, "fail to create task", "" | |
with gr.Blocks() as demo: | |
gr.Markdown(title) | |
gr.Markdown(description) | |
with gr.Accordion('upload tips', open=False): | |
with gr.Row(): | |
gr.HTML(f"<img src=\"{tip1}\" >") | |
gr.HTML(f"<img src=\"{tip2}\" >") | |
with gr.Row(): | |
with gr.Column(): | |
cloth_image = gr.Image(value=None, interactive=False, type="filepath", label="choose a clothing") | |
example = gr.Examples(inputs=cloth_image,examples_per_page=20,examples=cloth_examples, label="clothing") | |
hr_example = gr.Examples(inputs=cloth_image,examples_per_page=9,examples=cloth_hr_examples, label="invalid clothing") | |
with gr.Column(): | |
pose_image = gr.Image(value=None, type="filepath", label="choose/upload a photo") | |
example_pose = gr.Examples(inputs=pose_image, | |
examples_per_page=20, | |
examples=pose_examples) | |
with gr.Column(): | |
with gr.Column(): | |
# size_slider = gr.Slider(-3, 3, value=1, interactive=True, label="clothes size") | |
high_resolution = gr.Checkbox(value=False, label="high resolution", interactive=True) | |
run_button = gr.Button(value="Run") | |
info_text = gr.Textbox(value="", interactive=False, | |
label='runtime information') | |
res_image = gr.Image(label="result image", value=None, type="filepath") | |
MK01 = gr.Markdown() | |
run_button.click(fn=onClick, inputs=[cloth_image, pose_image, high_resolution], | |
outputs=[res_image, info_text, MK01]) | |
if __name__ == "__main__": | |
demo.queue(max_size=50) | |
# demo.queue(concurrency_count=60) | |
# demo.launch(server_name='0.0.0.0', server_port=225) | |
demo.launch(server_name='0.0.0.0') | |