wmill.s3_reader
1from io import BufferedReader, BytesIO 2from typing import Optional, Union 3 4import httpx 5 6 7class S3BufferedReader(BufferedReader): 8 def __init__(self, workspace: str, windmill_client: httpx.Client, file_key: str, s3_resource_path: Optional[str], storage: Optional[str]): 9 params = { 10 "file_key": file_key, 11 } 12 if s3_resource_path is not None: 13 params["s3_resource_path"] = s3_resource_path 14 if storage is not None: 15 params["storage"] = storage 16 self._context_manager = windmill_client.stream( 17 "GET", 18 f"/w/{workspace}/job_helpers/download_s3_file", 19 params=params, 20 timeout=None, 21 ) 22 23 def __enter__(self): 24 reader = self._context_manager.__enter__() 25 if reader.status_code >= 400: 26 error_bytes = reader.read() 27 try: 28 error_text = error_bytes.decode('utf-8') 29 except UnicodeDecodeError: 30 error_text = str(error_bytes) 31 raise httpx.HTTPStatusError( 32 f"Failed to load S3 file: {reader.status_code} {reader.reason_phrase} - {error_text}", 33 request=reader.request, 34 response=reader 35 ) 36 self._iterator = reader.iter_bytes() 37 return self 38 39 def peek(self, size=0): 40 raise Exception("Not implemented, use read() instead") 41 42 def read(self, size=-1): 43 read_result = [] 44 if size < 0: 45 for b in self._iterator: 46 read_result.append(b) 47 else: 48 for i in range(size): 49 try: 50 b = self._iterator.__next__() 51 except StopIteration: 52 break 53 read_result.append(b) 54 55 return b"".join(read_result) 56 57 def read1(self, size=-1): 58 return self.read(size) 59 60 def __exit__(self, *args): 61 self._context_manager.__exit__(*args) 62 63 64def bytes_generator(buffered_reader: Union[BufferedReader, BytesIO]): 65 while True: 66 byte = buffered_reader.read(50 * 1024) 67 if not byte: 68 break 69 yield byte
class
S3BufferedReader(_io.BufferedReader):
8class S3BufferedReader(BufferedReader): 9 def __init__(self, workspace: str, windmill_client: httpx.Client, file_key: str, s3_resource_path: Optional[str], storage: Optional[str]): 10 params = { 11 "file_key": file_key, 12 } 13 if s3_resource_path is not None: 14 params["s3_resource_path"] = s3_resource_path 15 if storage is not None: 16 params["storage"] = storage 17 self._context_manager = windmill_client.stream( 18 "GET", 19 f"/w/{workspace}/job_helpers/download_s3_file", 20 params=params, 21 timeout=None, 22 ) 23 24 def __enter__(self): 25 reader = self._context_manager.__enter__() 26 if reader.status_code >= 400: 27 error_bytes = reader.read() 28 try: 29 error_text = error_bytes.decode('utf-8') 30 except UnicodeDecodeError: 31 error_text = str(error_bytes) 32 raise httpx.HTTPStatusError( 33 f"Failed to load S3 file: {reader.status_code} {reader.reason_phrase} - {error_text}", 34 request=reader.request, 35 response=reader 36 ) 37 self._iterator = reader.iter_bytes() 38 return self 39 40 def peek(self, size=0): 41 raise Exception("Not implemented, use read() instead") 42 43 def read(self, size=-1): 44 read_result = [] 45 if size < 0: 46 for b in self._iterator: 47 read_result.append(b) 48 else: 49 for i in range(size): 50 try: 51 b = self._iterator.__next__() 52 except StopIteration: 53 break 54 read_result.append(b) 55 56 return b"".join(read_result) 57 58 def read1(self, size=-1): 59 return self.read(size) 60 61 def __exit__(self, *args): 62 self._context_manager.__exit__(*args)
Create a new buffered reader using the given readable raw IO object.
S3BufferedReader( workspace: str, windmill_client: httpx.Client, file_key: str, s3_resource_path: Optional[str], storage: Optional[str])
9 def __init__(self, workspace: str, windmill_client: httpx.Client, file_key: str, s3_resource_path: Optional[str], storage: Optional[str]): 10 params = { 11 "file_key": file_key, 12 } 13 if s3_resource_path is not None: 14 params["s3_resource_path"] = s3_resource_path 15 if storage is not None: 16 params["storage"] = storage 17 self._context_manager = windmill_client.stream( 18 "GET", 19 f"/w/{workspace}/job_helpers/download_s3_file", 20 params=params, 21 timeout=None, 22 )
def
read(self, size=-1):
43 def read(self, size=-1): 44 read_result = [] 45 if size < 0: 46 for b in self._iterator: 47 read_result.append(b) 48 else: 49 for i in range(size): 50 try: 51 b = self._iterator.__next__() 52 except StopIteration: 53 break 54 read_result.append(b) 55 56 return b"".join(read_result)
Read and return up to n bytes.
If the argument is omitted, None, or negative, reads and returns all data until EOF.
If the argument is positive, and the underlying raw stream is not 'interactive', multiple raw reads may be issued to satisfy the byte count (unless EOF is reached first). But for interactive raw streams (as well as sockets and pipes), at most one raw read will be issued, and a short result does not imply that EOF is imminent.
Returns an empty bytes object on EOF.
Returns None if the underlying raw stream was open in non-blocking mode and no data is available at the moment.
def
bytes_generator(buffered_reader: Union[_io.BufferedReader, _io.BytesIO]):