Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix video tempfile deletion prior to close #311

Open
fattank opened this issue Apr 20, 2024 · 1 comment
Open

Fix video tempfile deletion prior to close #311

fattank opened this issue Apr 20, 2024 · 1 comment

Comments

@fattank
Copy link

fattank commented Apr 20, 2024

I downloaded the project on April 19th. When running the video interpretation and generating the web page, an error occurred "Error generating code. Check the Developer Console AND the backend logs for details. Feel free to open a Github issue." I went to the cache temp folder and did not find the expected mp4 file, so I continued to troubleshoot. , found in the code, it was automatically deleted, which explains why there is no mp4 file. Then, I disabled automatic deletion and then added manual deletion code to solve the problem. The following is the code.

backend\video\utils.py

import base64
import io
import mimetypes
import os
import tempfile
import uuid
from typing import Any, Union, List, cast
from moviepy.editor import VideoFileClip # type: ignore
from PIL import Image
import math

DEBUG = True
TARGET_NUM_SCREENSHOTS = (20)

async def assemble_claude_prompt_video(video_data_url: str) -> list[Any]:
images = split_video_into_screenshots(video_data_url)

if DEBUG:
    save_images_to_tmp(images)

print(f"Number of frames extracted from video: {len(images)}")
if len(images) > TARGET_NUM_SCREENSHOTS:
    print(f"Too many screenshots: {len(images)}")
    raise ValueError("Too many screenshots extracted from video")

content_messages: list[dict[str, Union[dict[str, str], str]]] = []
for image in images:
    buffered = io.BytesIO()
    image.save(buffered, format="JPEG")
    base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
    media_type = "image/jpeg"

    content_messages.append(
        {
            "type": "image",
            "source": {
                "type": "base64",
                "media_type": media_type,
                "data": base64_data,
            },
        }
    )

return [
    {
        "role": "user",
        "content": content_messages,
    },
]

def split_video_into_screenshots(video_data_url: str) -> list[Image.Image]:
video_encoded_data = video_data_url.split(",")[1]
video_bytes = base64.b64decode(video_encoded_data)
mime_type = video_data_url.split(";")[0].split(":")[1]
suffix = mimetypes.guess_extension(mime_type)
temp_file_path = None

try:
    temp_video_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) #No delete 
    temp_file_path = temp_video_file.name
    temp_video_file.write(video_bytes)
    temp_video_file.flush()
    temp_video_file.close()
    print(f"Temporary file created and flushed at {temp_file_path}")

    clip = VideoFileClip(temp_file_path)
    images: list[Image.Image] = []
    total_frames = cast(int, clip.reader.nframes) # type: ignore

    frame_skip = max(1, math.ceil(total_frames / TARGET_NUM_SCREENSHOTS))
    for i, frame in enumerate(clip.iter_frames()):
        if i % frame_skip == 0:
            frame_image = Image.fromarray(frame)
            images.append(frame_image)
            if len(images) >= TARGET_NUM_SCREENSHOTS:
                break
    clip.close()
finally:
    if temp_file_path and os.path.exists(temp_file_path):
        os.remove(temp_file_path)
        print(f"Temporary file {temp_file_path} deleted successfully")

return images

def save_images_to_tmp(images: List[Image.Image]):
unique_dir_name = f"screenshots_{uuid.uuid4()}"
tmp_screenshots_dir = os.path.join(tempfile.gettempdir(), unique_dir_name)
os.makedirs(tmp_screenshots_dir, exist_ok=True)
for idx, image in enumerate(images):
image_filename = f"screenshot_{idx}.jpg"
tmp_filepath = os.path.join(tmp_screenshots_dir, image_filename)
image.save(tmp_filepath, format="JPEG")
print(f"Saved to {tmp_screenshots_dir}")

def extract_tag_content(tag: str, text: str) -> str:
tag_start = f"<{tag}>"
tag_end = f"</{tag}>"
start_idx = text.find(tag_start)
end_idx = text.find(tag_end, start_idx)
return text[start_idx + len(tag_start):end_idx] if start_idx != -1 and end_idx != -1 else ""

@abi
Copy link
Owner

abi commented Apr 22, 2024

@fattank thank you for sharing this. I'll look into incorporating this into the repo.

I do think my code is wrong here since the docs say

"If delete is true (the default) and delete_on_close is true (the default), the file is deleted as soon as it is closed. If delete is true and delete_on_close is false, the file is deleted on context manager exit only, or else when the file-like object is finalized."

Calling .close() before VideoFileClip is probably deleting the clip.

Nice job looking into this!

@abi abi changed the title I solved the problem that when running the video generation web page, the error " Error generating code. Check the Developer Console AND the backend logs for details. Feel free to open a Github issue. " was reported, Fix video tempfile deletion prior to close Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants