Skip to content

Commit ab9a957

Browse files
authored
Merge pull request #61 from Frameio/jh/download-improvements
[DEVREL-639] Add multi-threaded downloading
2 parents 078ae2c + 2611592 commit ab9a957

6 files changed

Lines changed: 217 additions & 27 deletions

File tree

examples/download_asset.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22

33
from frameioclient import FrameioClient
44

5-
def download_file(asset_id):
5+
def benchmark(asset_id):
66
token = os.getenv("FRAMEIO_TOKEN")
77
client = FrameioClient(token)
88
asset_info = client.get_asset(asset_id)
9-
client.download(asset_info, "downloads")
9+
accelerated_filename = client.download(asset_info, "downloads", prefix="accelerated_", multi_part=True, concurrency=20)
10+
11+
# print("Normal speed: {}, Accelerated speed: {}".format(normal_speed, accelerated_speed))
1012

1113
if __name__ == "__main__":
12-
download_file("6cc9a45f-64a7-456e-a95f-98687220bf6e")
14+
# download_file("60ff4cca-f97b-4311-be24-0eecd6970c01")
15+
benchmark("20a1df34-e8ad-48fd-b455-c68294cc7f71")
16+
# benchmark("9cee7966-7db1-4066-b326-f9e6f5e929e4")

frameioclient/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ def upload(self, asset, file):
401401
uploader = FrameioUploader(asset, file)
402402
uploader.upload()
403403

404-
def download(self, asset, download_folder):
404+
def download(self, asset, download_folder, prefix=None, multi_part=False, concurrency=5):
405405
"""
406406
Download an asset. The method will exit once the file is downloaded.
407407
@@ -413,8 +413,8 @@ def download(self, asset, download_folder):
413413
414414
client.download(asset, "~./Downloads")
415415
"""
416-
downloader = FrameioDownloader(asset, download_folder)
417-
downloader.download()
416+
downloader = FrameioDownloader(asset, download_folder, prefix, multi_part, concurrency)
417+
return downloader.download_handler()
418418

419419
def get_comment(self, comment_id, **kwargs):
420420
"""

frameioclient/download.py

Lines changed: 145 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,62 @@
1+
import io
12
import os
23
import math
4+
import time
35
import requests
6+
import threading
7+
import concurrent.futures
48

5-
from .exceptions import DownloadException
9+
from .utils import format_bytes, normalize_filename
10+
from .exceptions import DownloadException, WatermarkIDDownloadException, AssetNotFullyUploaded
11+
12+
thread_local = threading.local()
613

714
class FrameioDownloader(object):
8-
def __init__(self, asset, download_folder):
15+
def __init__(self, asset, download_folder, prefix, multi_part=False, concurrency=5):
16+
self.multi_part = multi_part
917
self.asset = asset
18+
self.asset_type = None
1019
self.download_folder = download_folder
20+
self.resolution_map = dict()
21+
self.destination = None
22+
self.watermarked = asset['is_session_watermarked'] # Default is probably false
23+
self.file_size = asset["filesize"]
24+
self.concurrency = concurrency
25+
self.futures = list()
26+
self.chunk_size = (25 * 1024 * 1024) # 25 MB chunk size
27+
self.chunks = math.ceil(self.file_size/self.chunk_size)
28+
self.prefix = prefix
29+
self.filename = normalize_filename(asset["name"])
30+
31+
self._evaluate_asset()
32+
33+
def _evaluate_asset(self):
34+
if self.asset.get("_type") != "file":
35+
raise DownloadException(message="Unsupport Asset type: {}".format(self.asset.get("_type")))
36+
37+
if self.asset.get("upload_completed_at") == None:
38+
raise AssetNotFullyUploaded
39+
40+
def _get_session(self):
41+
if not hasattr(thread_local, "session"):
42+
thread_local.session = requests.Session()
43+
return thread_local.session
44+
45+
def _create_file_stub(self):
46+
try:
47+
fp = open(self.destination, "wb")
48+
fp.write(b"\0" * self.file_size)
49+
fp.close()
50+
except FileExistsError as e:
51+
print(e)
52+
raise e
53+
return True
1154

1255
def get_download_key(self):
1356
try:
1457
url = self.asset['original']
1558
except KeyError as e:
16-
if self.asset['is_session_watermarked'] == True:
59+
if self.watermarked == True:
1760
resolution_list = list()
1861
try:
1962
for resolution_key, download_url in sorted(self.asset['downloads'].items()):
@@ -31,21 +74,109 @@ def get_download_key(self):
3174
except KeyError:
3275
raise DownloadException
3376
else:
34-
raise DownloadException
77+
raise WatermarkIDDownloadException
3578

3679
return url
3780

38-
def download(self):
39-
original_filename = self.asset['name']
40-
final_destination = os.path.join(self.download_folder, original_filename)
81+
def get_path(self):
82+
if self.prefix != None:
83+
self.filename = self.prefix + self.filename
4184

42-
print("Final destiation: {}".format(final_destination))
85+
if self.destination == None:
86+
final_destination = os.path.join(self.download_folder, self.filename)
87+
self.destination = final_destination
88+
89+
return self.destination
4390

44-
if os.path.isfile(final_destination):
45-
return final_destination
91+
def download_handler(self):
92+
if os.path.isfile(self.get_path()):
93+
print("File already exists at this location.")
94+
return self.destination
4695
else:
4796
url = self.get_download_key()
48-
r = requests.get(url)
49-
open(final_destination, 'wb').write(r.content)
50-
return final_destination
51-
97+
98+
if self.watermarked == True:
99+
return self.download(url)
100+
else:
101+
if self.multi_part == True:
102+
return self.multi_part_download(url)
103+
else:
104+
return self.download(url)
105+
106+
def download(self, url):
107+
start_time = time.time()
108+
print("Beginning download -- {} -- {}".format(self.asset["name"], format_bytes(self.file_size, type="size")))
109+
110+
# Downloading
111+
r = requests.get(url)
112+
open(self.destination, "wb").write(r.content)
113+
114+
download_time = time.time() - start_time
115+
download_speed = format_bytes(math.ceil(self.file_size/(download_time)))
116+
print("Downloaded {} at {}".format(format_bytes(self.file_size, type="size"), download_speed))
117+
118+
return self.destination, download_speed
119+
120+
def multi_part_download(self, url):
121+
start_time = time.time()
122+
123+
# Generate stub
124+
try:
125+
self._create_file_stub()
126+
127+
except Exception as e:
128+
raise DownloadException(message=e)
129+
130+
offset = math.ceil(self.file_size / self.chunks)
131+
in_byte = 0 # Set initially here, but then override
132+
133+
print("Multi-part download -- {} -- {}".format(self.asset["name"], format_bytes(self.file_size, type="size")))
134+
135+
# Queue up threads
136+
with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor:
137+
for i in range(int(self.chunks)):
138+
out_byte = offset * (i+1) # Increment by the iterable + 1 so we don't mutiply by zero
139+
task = (url, in_byte, out_byte, i)
140+
141+
self.futures.append(executor.submit(self.download_chunk, task))
142+
in_byte = out_byte # Reset new in byte equal to last out byte
143+
144+
# Wait on threads to finish
145+
for future in concurrent.futures.as_completed(self.futures):
146+
try:
147+
status = future.result()
148+
print(status)
149+
except Exception as exc:
150+
print(exc)
151+
152+
# Calculate and print stats
153+
download_time = time.time() - start_time
154+
download_speed = format_bytes(math.ceil(self.file_size/(download_time)))
155+
print("Downloaded {} at {}".format(format_bytes(self.file_size, type="size"), download_speed))
156+
157+
return self.destination
158+
159+
def download_chunk(self, task):
160+
# Download a particular chunk
161+
# Called by the threadpool executor
162+
163+
url = task[0]
164+
start_byte = task[1]
165+
end_byte = task[2]
166+
chunk_number = task[3]
167+
168+
session = self._get_session()
169+
print("Getting chunk {}/{}".format(chunk_number + 1, self.chunks))
170+
171+
# Specify the starting and ending of the file
172+
headers = {"Range": "bytes=%d-%d" % (start_byte, end_byte)}
173+
174+
# Grab the data as a stream
175+
r = session.get(url, headers=headers, stream=True)
176+
177+
with open(self.destination, "r+b") as fp:
178+
fp.seek(start_byte) # Seek to the right of the file
179+
fp.write(r.content) # Write the data
180+
print("Done writing chunk {}/{}".format(chunk_number + 1, self.chunks))
181+
182+
return "Complete!"

frameioclient/exceptions.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def __init__(
1111
self.message = message
1212
super().__init__(self.message)
1313

14-
class DownloadException(Exception):
14+
class WatermarkIDDownloadException(Exception):
1515
"""Exception raised when trying to download a file where there is no available
1616
download URL.
1717
"""
@@ -20,4 +20,24 @@ def __init__(
2020
message="This file is unavailable for download due to security and permission settings."
2121
):
2222
self.message = message
23-
super().__init__(self.message)
23+
super().__init__(self.message)
24+
25+
class DownloadException(Exception):
26+
"""Exception raised when trying to download a file
27+
"""
28+
def __init__(
29+
self,
30+
message="Generic Dowload exception."
31+
):
32+
self.message = message
33+
super().__init__(self.message)
34+
35+
class AssetNotFullyUploaded(Exception):
36+
"""Exception raised when trying to download a file that isn't yet fully upload.
37+
"""
38+
def __init__(
39+
self,
40+
message="Unable to download this asset because it not yet fully uploaded."
41+
):
42+
self.message = message
43+
super().__init__(self.message)

frameioclient/utils.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import xxhash
22
import sys
3+
import re
34

45
KB = 1024
56
MB = KB * KB
@@ -24,20 +25,26 @@ def stream(func, page=1, page_size=20):
2425

2526
page += 1
2627

27-
def format_bytes(size):
28+
def format_bytes(size, type="speed"):
2829
"""
2930
Convert bytes to KB/MB/GB/TB/s
3031
"""
3132
# 2**10 = 1024
3233
power = 2**10
3334
n = 0
34-
power_labels = {0 : 'B/s', 1: 'KB/s', 2: 'MB/s', 3: 'GB/s', 4: 'TB/s'}
35+
power_labels = {0 : 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
3536

3637
while size > power:
3738
size /= power
3839
n += 1
3940

40-
return " ".join((str(round(size, 2)), power_labels[n]))
41+
formatted = " ".join((str(round(size, 2)), power_labels[n]))
42+
43+
if type == "speed":
44+
return formatted + "/s"
45+
46+
elif type == "size":
47+
return formatted
4148

4249
def calculate_hash(file_path):
4350
"""
@@ -73,4 +80,32 @@ def compare_items(dict1, dict2):
7380
if comparison == False:
7481
print("File mismatch between upload and download")
7582

76-
return comparison
83+
return comparison
84+
85+
def get_valid_filename(s):
86+
"""
87+
Strip out invalid characters from a filename using regex
88+
"""
89+
s = str(s).strip().replace(' ', '_')
90+
return re.sub(r'(?u)[^-\w.]', '', s)
91+
92+
def normalize_filename(fn):
93+
"""
94+
Normalize filename using pure python
95+
"""
96+
validchars = "-_.() "
97+
out = ""
98+
99+
if isinstance(fn, str):
100+
pass
101+
elif isinstance(fn, unicode):
102+
fn = str(fn.decode('utf-8', 'ignore'))
103+
else:
104+
pass
105+
106+
for c in fn:
107+
if str.isalpha(c) or str.isdigit(c) or (c in validchars):
108+
out += c
109+
else:
110+
out += "_"
111+
return out

tests/integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def test_download(client, override=False):
107107
start_time = time.time()
108108
print("{}/{} Beginning to download: {}".format(count, len(asset_list), asset['name']))
109109

110-
client.download(asset, 'downloads')
110+
client.download(asset, 'downloads', multi_part=True, concurrency=20)
111111

112112
download_time = time.time() - start_time
113113
download_speed = format_bytes(ceil(asset['filesize']/(download_time)))

0 commit comments

Comments
 (0)