1- import requests
2- import math
1+ import io
32import os
3+ import math
4+ import time
5+ import requests
6+ import threading
7+ import concurrent .futures
8+
9+ from .utils import format_bytes , normalize_filename
10+ from .exceptions import DownloadException , WatermarkIDDownloadException , AssetNotFullyUploaded
11+
12+ thread_local = threading .local ()
413
514class FrameioDownloader (object ):
6- def __init__ (self , asset , download_folder ):
15+ def __init__ (self , asset , download_folder , prefix , multi_part = False , concurrency = 5 ):
16+ self .multi_part = multi_part
717 self .asset = asset
18+ self .asset_type = None
819 self .download_folder = download_folder
20+ self .resolution_map = dict ()
21+ self .destination = None
22+ self .watermarked = asset ['is_session_watermarked' ] # Default is probably false
23+ self .file_size = asset ["filesize" ]
24+ self .concurrency = concurrency
25+ self .futures = list ()
26+ self .chunk_size = (25 * 1024 * 1024 ) # 25 MB chunk size
27+ self .chunks = math .ceil (self .file_size / self .chunk_size )
28+ self .prefix = prefix
29+ self .filename = normalize_filename (asset ["name" ])
30+
31+ self ._evaluate_asset ()
932
10- def download (self ):
11- original_filename = self .asset [ 'name' ]
12- final_destination = os . path . join (self .download_folder , original_filename )
33+ def _evaluate_asset (self ):
34+ if self .asset . get ( "_type" ) != "file" :
35+ raise DownloadException ( message = "Unsupport Asset type: {}" . format (self .asset . get ( "_type" )) )
1336
14- url = self .asset ['original' ]
37+ if self .asset .get ("upload_completed_at" ) == None :
38+ raise AssetNotFullyUploaded
39+
40+ def _get_session (self ):
41+ if not hasattr (thread_local , "session" ):
42+ thread_local .session = requests .Session ()
43+ return thread_local .session
44+
45+ def _create_file_stub (self ):
46+ try :
47+ fp = open (self .destination , "wb" )
48+ fp .write (b"\0 " * self .file_size )
49+ fp .close ()
50+ except FileExistsError as e :
51+ print (e )
52+ raise e
53+ return True
54+
55+ def get_download_key (self ):
56+ try :
57+ url = self .asset ['original' ]
58+ except KeyError as e :
59+ if self .watermarked == True :
60+ resolution_list = list ()
61+ try :
62+ for resolution_key , download_url in sorted (self .asset ['downloads' ].items ()):
63+ resolution = resolution_key .split ("_" )[1 ] # Grab the item at index 1 (resolution)
64+ try :
65+ resolution = int (resolution )
66+ except ValueError :
67+ continue
68+
69+ if download_url is not None :
70+ resolution_list .append (download_url )
71+
72+ # Grab the highest resolution (first item) now
73+ url = resolution_list [0 ]
74+ except KeyError :
75+ raise DownloadException
76+ else :
77+ raise WatermarkIDDownloadException
78+
79+ return url
80+
81+ def get_path (self ):
82+ if self .prefix != None :
83+ self .filename = self .prefix + self .filename
84+
85+ if self .destination == None :
86+ final_destination = os .path .join (self .download_folder , self .filename )
87+ self .destination = final_destination
88+
89+ return self .destination
90+
91+ def download_handler (self ):
92+ if os .path .isfile (self .get_path ()):
93+ print ("File already exists at this location." )
94+ return self .destination
95+ else :
96+ url = self .get_download_key ()
97+
98+ if self .watermarked == True :
99+ return self .download (url )
100+ else :
101+ if self .multi_part == True :
102+ return self .multi_part_download (url )
103+ else :
104+ return self .download (url )
105+
106+ def download (self , url ):
107+ start_time = time .time ()
108+ print ("Beginning download -- {} -- {}" .format (self .asset ["name" ], format_bytes (self .file_size , type = "size" )))
109+
110+ # Downloading
15111 r = requests .get (url )
112+ open (self .destination , "wb" ).write (r .content )
113+
114+ download_time = time .time () - start_time
115+ download_speed = format_bytes (math .ceil (self .file_size / (download_time )))
116+ print ("Downloaded {} at {}" .format (format_bytes (self .file_size , type = "size" ), download_speed ))
117+
118+ return self .destination , download_speed
119+
120+ def multi_part_download (self , url ):
121+ start_time = time .time ()
122+
123+ # Generate stub
124+ try :
125+ self ._create_file_stub ()
126+
127+ except Exception as e :
128+ raise DownloadException (message = e )
129+
130+ offset = math .ceil (self .file_size / self .chunks )
131+ in_byte = 0 # Set initially here, but then override
132+
133+ print ("Multi-part download -- {} -- {}" .format (self .asset ["name" ], format_bytes (self .file_size , type = "size" )))
134+
135+ # Queue up threads
136+ with concurrent .futures .ThreadPoolExecutor (max_workers = self .concurrency ) as executor :
137+ for i in range (int (self .chunks )):
138+ out_byte = offset * (i + 1 ) # Increment by the iterable + 1 so we don't mutiply by zero
139+ task = (url , in_byte , out_byte , i )
140+
141+ self .futures .append (executor .submit (self .download_chunk , task ))
142+ in_byte = out_byte # Reset new in byte equal to last out byte
143+
144+ # Wait on threads to finish
145+ for future in concurrent .futures .as_completed (self .futures ):
146+ try :
147+ status = future .result ()
148+ print (status )
149+ except Exception as exc :
150+ print (exc )
16151
17- open (final_destination , 'wb' ).write (r .content )
18-
152+ # Calculate and print stats
153+ download_time = time .time () - start_time
154+ download_speed = format_bytes (math .ceil (self .file_size / (download_time )))
155+ print ("Downloaded {} at {}" .format (format_bytes (self .file_size , type = "size" ), download_speed ))
156+
157+ return self .destination
158+
159+ def download_chunk (self , task ):
160+ # Download a particular chunk
161+ # Called by the threadpool executor
162+
163+ url = task [0 ]
164+ start_byte = task [1 ]
165+ end_byte = task [2 ]
166+ chunk_number = task [3 ]
167+
168+ session = self ._get_session ()
169+ print ("Getting chunk {}/{}" .format (chunk_number + 1 , self .chunks ))
170+
171+ # Specify the starting and ending of the file
172+ headers = {"Range" : "bytes=%d-%d" % (start_byte , end_byte )}
173+
174+ # Grab the data as a stream
175+ r = session .get (url , headers = headers , stream = True )
176+
177+ with open (self .destination , "r+b" ) as fp :
178+ fp .seek (start_byte ) # Seek to the right of the file
179+ fp .write (r .content ) # Write the data
180+ print ("Done writing chunk {}/{}" .format (chunk_number + 1 , self .chunks ))
181+
182+ return "Complete!"
0 commit comments