wmill.s3_reader

 1from io import BufferedReader, BytesIO
 2from typing import Optional, Union
 3
 4import httpx
 5
 6
 7class S3BufferedReader(BufferedReader):
 8    def __init__(self, workspace: str, windmill_client: httpx.Client, file_key: str, s3_resource_path: Optional[str], storage: Optional[str]):
 9        params = {
10            "file_key": file_key,
11        }
12        if s3_resource_path is not None:
13            params["s3_resource_path"] = s3_resource_path
14        if storage is not None:
15            params["storage"] = storage
16        self._context_manager = windmill_client.stream(
17            "GET",
18            f"/w/{workspace}/job_helpers/download_s3_file",
19            params=params,
20            timeout=None,
21        )
22
23    def __enter__(self):
24        reader = self._context_manager.__enter__()
25        if reader.status_code >= 400:
26            error_bytes = reader.read()
27            try:
28                error_text = error_bytes.decode('utf-8')
29            except UnicodeDecodeError:
30                error_text = str(error_bytes)
31            raise httpx.HTTPStatusError(
32                f"Failed to load S3 file: {reader.status_code} {reader.reason_phrase} - {error_text}",
33                request=reader.request,
34                response=reader
35            )
36        self._iterator = reader.iter_bytes()
37        return self
38
39    def peek(self, size=0):
40        raise Exception("Not implemented, use read() instead")
41
42    def read(self, size=-1):
43        read_result = []
44        if size < 0:
45            for b in self._iterator:
46                read_result.append(b)
47        else:
48            for i in range(size):
49                try:
50                    b = self._iterator.__next__()
51                except StopIteration:
52                    break
53                read_result.append(b)
54
55        return b"".join(read_result)
56
57    def read1(self, size=-1):
58        return self.read(size)
59
60    def __exit__(self, *args):
61        self._context_manager.__exit__(*args)
62
63
64def bytes_generator(buffered_reader: Union[BufferedReader, BytesIO]):
65    while True:
66        byte = buffered_reader.read(50 * 1024)
67        if not byte:
68            break
69        yield byte
class S3BufferedReader(_io.BufferedReader):
 8class S3BufferedReader(BufferedReader):
 9    def __init__(self, workspace: str, windmill_client: httpx.Client, file_key: str, s3_resource_path: Optional[str], storage: Optional[str]):
10        params = {
11            "file_key": file_key,
12        }
13        if s3_resource_path is not None:
14            params["s3_resource_path"] = s3_resource_path
15        if storage is not None:
16            params["storage"] = storage
17        self._context_manager = windmill_client.stream(
18            "GET",
19            f"/w/{workspace}/job_helpers/download_s3_file",
20            params=params,
21            timeout=None,
22        )
23
24    def __enter__(self):
25        reader = self._context_manager.__enter__()
26        if reader.status_code >= 400:
27            error_bytes = reader.read()
28            try:
29                error_text = error_bytes.decode('utf-8')
30            except UnicodeDecodeError:
31                error_text = str(error_bytes)
32            raise httpx.HTTPStatusError(
33                f"Failed to load S3 file: {reader.status_code} {reader.reason_phrase} - {error_text}",
34                request=reader.request,
35                response=reader
36            )
37        self._iterator = reader.iter_bytes()
38        return self
39
40    def peek(self, size=0):
41        raise Exception("Not implemented, use read() instead")
42
43    def read(self, size=-1):
44        read_result = []
45        if size < 0:
46            for b in self._iterator:
47                read_result.append(b)
48        else:
49            for i in range(size):
50                try:
51                    b = self._iterator.__next__()
52                except StopIteration:
53                    break
54                read_result.append(b)
55
56        return b"".join(read_result)
57
58    def read1(self, size=-1):
59        return self.read(size)
60
61    def __exit__(self, *args):
62        self._context_manager.__exit__(*args)

Create a new buffered reader using the given readable raw IO object.

S3BufferedReader( workspace: str, windmill_client: httpx.Client, file_key: str, s3_resource_path: Optional[str], storage: Optional[str])
 9    def __init__(self, workspace: str, windmill_client: httpx.Client, file_key: str, s3_resource_path: Optional[str], storage: Optional[str]):
10        params = {
11            "file_key": file_key,
12        }
13        if s3_resource_path is not None:
14            params["s3_resource_path"] = s3_resource_path
15        if storage is not None:
16            params["storage"] = storage
17        self._context_manager = windmill_client.stream(
18            "GET",
19            f"/w/{workspace}/job_helpers/download_s3_file",
20            params=params,
21            timeout=None,
22        )
def peek(self, size=0):
40    def peek(self, size=0):
41        raise Exception("Not implemented, use read() instead")
def read(self, size=-1):
43    def read(self, size=-1):
44        read_result = []
45        if size < 0:
46            for b in self._iterator:
47                read_result.append(b)
48        else:
49            for i in range(size):
50                try:
51                    b = self._iterator.__next__()
52                except StopIteration:
53                    break
54                read_result.append(b)
55
56        return b"".join(read_result)

Read and return up to n bytes.

If the argument is omitted, None, or negative, reads and returns all data until EOF.

If the argument is positive, and the underlying raw stream is not 'interactive', multiple raw reads may be issued to satisfy the byte count (unless EOF is reached first). But for interactive raw streams (as well as sockets and pipes), at most one raw read will be issued, and a short result does not imply that EOF is imminent.

Returns an empty bytes object on EOF.

Returns None if the underlying raw stream was open in non-blocking mode and no data is available at the moment.

def read1(self, size=-1):
58    def read1(self, size=-1):
59        return self.read(size)

Read and return up to n bytes, with at most one read() call to the underlying raw stream. A short result does not imply that EOF is imminent.

Returns an empty bytes object on EOF.

def bytes_generator(buffered_reader: Union[_io.BufferedReader, _io.BytesIO]):
65def bytes_generator(buffered_reader: Union[BufferedReader, BytesIO]):
66    while True:
67        byte = buffered_reader.read(50 * 1024)
68        if not byte:
69            break
70        yield byte