-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathloading_utils.py
More file actions
137 lines (112 loc) · 4.12 KB
/
loading_utils.py
File metadata and controls
137 lines (112 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
Copyright 2024 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
from typing import Callable, List, Optional, Union
import PIL.Image
import PIL.ImageOps
import requests
import tempfile
from urllib.parse import unquote, urlparse
from .import_utils import BACKENDS_MAPPING, is_imageio_available
def load_image(image: Union[str, PIL.Image.Image]) -> PIL.Image.Image:
"""
Loads `image` to a PIL Image.
Args:
image (`str` or `PIL.Image.Image`):
The image to convert to the PIL Image format.
Returns:
`PIL.Image.Image`:
A PIL Image.
"""
if isinstance(image, str):
if image.startswith("http://") or image.startswith("https://"):
image = PIL.Image.open(requests.get(image, stream=True).raw)
elif os.path.isfile(image):
image = PIL.Image.open(image)
else:
raise ValueError(
f"Incorrect path or url, URLs must start with `http://` or `https://`, and {image} is not a valid path"
)
elif isinstance(image, PIL.Image.Image):
image = image
else:
raise ValueError("Incorrect format used for image. Should be an url linking to an image, a local path, or a PIL image.")
image = PIL.ImageOps.exif_transpose(image)
image = image.convert("RGB")
return image
def load_video(
video: str,
convert_method: Optional[Callable[[List[PIL.Image.Image]], List[PIL.Image.Image]]] = None,
) -> List[PIL.Image.Image]:
"""
Loads `video` to a list of PIL Image.
Args:
video (`str`):
A URL or Path to a video to convert to a list of PIL Image format.
convert_method (Callable[[List[PIL.Image.Image]], List[PIL.Image.Image]], *optional*):
A conversion method to apply to the video after loading it. When set to `None` the images will be converted
to "RGB".
Returns:
`List[PIL.Image.Image]`:
The video as a list of PIL images.
"""
is_url = video.startswith("http://") or video.startswith("https://")
is_file = os.path.isfile(video)
was_tempfile_created = False
if not (is_url or is_file):
raise ValueError(
f"Incorrect path or URL. URLs must start with `http://` or `https://`, and {video} is not a valid path."
)
if is_url:
response = requests.get(video, stream=True)
if response.status_code != 200:
raise ValueError(f"Failed to download video. Status code: {response.status_code}")
parsed_url = urlparse(video)
file_name = os.path.basename(unquote(parsed_url.path))
suffix = os.path.splitext(file_name)[1] or ".mp4"
video_path = tempfile.NamedTemporaryFile(suffix=suffix, delete=False).name
was_tempfile_created = True
video_data = response.iter_content(chunk_size=8192)
with open(video_path, "wb") as f:
for chunk in video_data:
f.write(chunk)
video = video_path
pil_images = []
if video.endswith(".gif"):
gif = PIL.Image.open(video)
try:
while True:
pil_images.append(gif.copy())
gif.seek(gif.tell() + 1)
except EOFError:
pass
else:
if is_imageio_available():
import imageio
else:
raise ImportError(BACKENDS_MAPPING["imageio"][1].format("load_video"))
try:
imageio.plugins.ffmpeg.get_exe()
except AttributeError:
raise AttributeError(
"`Unable to find an ffmpeg installation on your machine. Please install via `pip install imageio-ffmpeg"
)
with imageio.get_reader(video) as reader:
# Read all frames
for frame in reader:
pil_images.append(PIL.Image.fromarray(frame))
if was_tempfile_created:
os.remove(video_path)
if convert_method is not None:
pil_images = convert_method(pil_images)
return pil_images