wmill.client
1from __future__ import annotations 2 3import atexit 4import datetime as dt 5import functools 6from io import BufferedReader, BytesIO 7import logging 8import os 9import random 10import time 11import warnings 12import json 13from json import JSONDecodeError 14from typing import Dict, Any, Union, Literal, Optional 15import re 16 17import httpx 18 19from .s3_reader import S3BufferedReader, bytes_generator 20from .s3_types import ( 21 Boto3ConnectionSettings, 22 DuckDbConnectionSettings, 23 PolarsConnectionSettings, 24 S3Object, 25) 26 27_client: "Windmill | None" = None 28 29logger = logging.getLogger("windmill_client") 30 31JobStatus = Literal["RUNNING", "WAITING", "COMPLETED"] 32 33 34class Windmill: 35 def __init__(self, base_url=None, token=None, workspace=None, verify=True): 36 base = ( 37 base_url 38 or os.environ.get("BASE_INTERNAL_URL") 39 or os.environ.get("WM_BASE_URL") 40 ) 41 42 self.base_url = f"{base}/api" 43 self.token = token or os.environ.get("WM_TOKEN") 44 self.headers = { 45 "Content-Type": "application/json", 46 "Authorization": f"Bearer {self.token}", 47 } 48 self.verify = verify 49 self.client = self.get_client() 50 self.workspace = workspace or os.environ.get("WM_WORKSPACE") 51 self.path = os.environ.get("WM_JOB_PATH") 52 53 self.mocked_api = self.get_mocked_api() 54 55 assert self.workspace, ( 56 f"workspace required as an argument or as WM_WORKSPACE environment variable" 57 ) 58 59 def get_mocked_api(self) -> Optional[dict]: 60 mocked_path = os.environ.get("WM_MOCKED_API_FILE") 61 if not mocked_path: 62 return None 63 logger.info("Using mocked API from %s", mocked_path) 64 mocked_api = {"variables": {}, "resources": {}} 65 try: 66 with open(mocked_path, "r") as f: 67 incoming_mocked_api = json.load(f) 68 mocked_api = {**mocked_api, **incoming_mocked_api} 69 except Exception as e: 70 logger.warning( 71 "Error parsing mocked API file at path %s Using empty mocked API.", 72 mocked_path, 73 ) 74 logger.debug(e) 75 return mocked_api 76 77 def get_client(self) -> httpx.Client: 78 return httpx.Client( 79 base_url=self.base_url, 80 headers=self.headers, 81 verify=self.verify, 82 ) 83 84 def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 85 endpoint = endpoint.lstrip("/") 86 resp = self.client.get(f"/{endpoint}", **kwargs) 87 if raise_for_status: 88 try: 89 resp.raise_for_status() 90 except httpx.HTTPStatusError as err: 91 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 92 logger.error(error) 93 raise Exception(error) 94 return resp 95 96 def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 97 endpoint = endpoint.lstrip("/") 98 resp = self.client.post(f"/{endpoint}", **kwargs) 99 if raise_for_status: 100 try: 101 resp.raise_for_status() 102 except httpx.HTTPStatusError as err: 103 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 104 logger.error(error) 105 raise Exception(error) 106 return resp 107 108 def create_token(self, duration=dt.timedelta(days=1)) -> str: 109 endpoint = "/users/tokens/create" 110 payload = { 111 "label": f"refresh {time.time()}", 112 "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"), 113 } 114 return self.post(endpoint, json=payload).text 115 116 def run_script_async( 117 self, 118 path: str = None, 119 hash_: str = None, 120 args: dict = None, 121 scheduled_in_secs: int = None, 122 ) -> str: 123 """Create a script job and return its job id. 124 125 .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead. 126 """ 127 logging.warning( 128 "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.", 129 ) 130 assert not (path and hash_), "path and hash_ are mutually exclusive" 131 return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs) 132 133 def _run_script_async_internal( 134 self, 135 path: str = None, 136 hash_: str = None, 137 args: dict = None, 138 scheduled_in_secs: int = None, 139 ) -> str: 140 """Internal helper for running scripts asynchronously.""" 141 args = args or {} 142 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 143 if os.environ.get("WM_JOB_ID"): 144 params["parent_job"] = os.environ.get("WM_JOB_ID") 145 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 146 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 147 148 if path: 149 endpoint = f"/w/{self.workspace}/jobs/run/p/{path}" 150 elif hash_: 151 endpoint = f"/w/{self.workspace}/jobs/run/h/{hash_}" 152 else: 153 raise Exception("path or hash_ must be provided") 154 155 return self.post(endpoint, json=args, params=params).text 156 157 def run_script_by_path_async( 158 self, 159 path: str, 160 args: dict = None, 161 scheduled_in_secs: int = None, 162 ) -> str: 163 """Create a script job by path and return its job id.""" 164 return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs) 165 166 def run_script_by_hash_async( 167 self, 168 hash_: str, 169 args: dict = None, 170 scheduled_in_secs: int = None, 171 ) -> str: 172 """Create a script job by hash and return its job id.""" 173 return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs) 174 175 def run_flow_async( 176 self, 177 path: str, 178 args: dict = None, 179 scheduled_in_secs: int = None, 180 # can only be set to false if this the job will be fully await and not concurrent with any other job 181 # as otherwise the child flow and its own child will store their state in the parent job which will 182 # lead to incorrectness and failures 183 do_not_track_in_parent: bool = True, 184 ) -> str: 185 """Create a flow job and return its job id.""" 186 args = args or {} 187 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 188 if not do_not_track_in_parent: 189 if os.environ.get("WM_JOB_ID"): 190 params["parent_job"] = os.environ.get("WM_JOB_ID") 191 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 192 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 193 if path: 194 endpoint = f"/w/{self.workspace}/jobs/run/f/{path}" 195 else: 196 raise Exception("path must be provided") 197 return self.post(endpoint, json=args, params=params).text 198 199 def run_script( 200 self, 201 path: str = None, 202 hash_: str = None, 203 args: dict = None, 204 timeout: dt.timedelta | int | float | None = None, 205 verbose: bool = False, 206 cleanup: bool = True, 207 assert_result_is_not_none: bool = False, 208 ) -> Any: 209 """Run script synchronously and return its result. 210 211 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 212 """ 213 logging.warning( 214 "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.", 215 ) 216 assert not (path and hash_), "path and hash_ are mutually exclusive" 217 return self._run_script_internal( 218 path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose, 219 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 220 ) 221 222 def _run_script_internal( 223 self, 224 path: str = None, 225 hash_: str = None, 226 args: dict = None, 227 timeout: dt.timedelta | int | float | None = None, 228 verbose: bool = False, 229 cleanup: bool = True, 230 assert_result_is_not_none: bool = False, 231 ) -> Any: 232 """Internal helper for running scripts synchronously.""" 233 args = args or {} 234 235 if verbose: 236 if path: 237 logger.info(f"running `{path}` synchronously with {args = }") 238 elif hash_: 239 logger.info(f"running script with hash `{hash_}` synchronously with {args = }") 240 241 if isinstance(timeout, dt.timedelta): 242 timeout = timeout.total_seconds() 243 244 job_id = self._run_script_async_internal(path=path, hash_=hash_, args=args) 245 return self.wait_job( 246 job_id, timeout, verbose, cleanup, assert_result_is_not_none 247 ) 248 249 def run_script_by_path( 250 self, 251 path: str, 252 args: dict = None, 253 timeout: dt.timedelta | int | float | None = None, 254 verbose: bool = False, 255 cleanup: bool = True, 256 assert_result_is_not_none: bool = False, 257 ) -> Any: 258 """Run script by path synchronously and return its result.""" 259 return self._run_script_internal( 260 path=path, args=args, timeout=timeout, verbose=verbose, 261 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 262 ) 263 264 def run_script_by_hash( 265 self, 266 hash_: str, 267 args: dict = None, 268 timeout: dt.timedelta | int | float | None = None, 269 verbose: bool = False, 270 cleanup: bool = True, 271 assert_result_is_not_none: bool = False, 272 ) -> Any: 273 """Run script by hash synchronously and return its result.""" 274 return self._run_script_internal( 275 hash_=hash_, args=args, timeout=timeout, verbose=verbose, 276 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 277 ) 278 279 def wait_job( 280 self, 281 job_id, 282 timeout: dt.timedelta | int | float | None = None, 283 verbose: bool = False, 284 cleanup: bool = True, 285 assert_result_is_not_none: bool = False, 286 ): 287 def cancel_job(): 288 logger.warning(f"cancelling job: {job_id}") 289 self.post( 290 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 291 json={"reason": "parent script cancelled"}, 292 ).raise_for_status() 293 294 if cleanup: 295 atexit.register(cancel_job) 296 297 start_time = time.time() 298 299 if isinstance(timeout, dt.timedelta): 300 timeout = timeout.total_seconds() 301 302 while True: 303 result_res = self.get( 304 f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True 305 ).json() 306 307 started = result_res["started"] 308 completed = result_res["completed"] 309 success = result_res["success"] 310 311 if not started and verbose: 312 logger.info(f"job {job_id} has not started yet") 313 314 if cleanup and completed: 315 atexit.unregister(cancel_job) 316 317 if completed: 318 result = result_res["result"] 319 if success: 320 if result is None and assert_result_is_not_none: 321 raise Exception("Result was none") 322 return result 323 else: 324 error = result["error"] 325 raise Exception(f"Job {job_id} was not successful: {str(error)}") 326 327 if timeout and ((time.time() - start_time) > timeout): 328 msg = "reached timeout" 329 logger.warning(msg) 330 self.post( 331 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 332 json={"reason": msg}, 333 ) 334 raise TimeoutError(msg) 335 if verbose: 336 logger.info(f"sleeping 0.5 seconds for {job_id = }") 337 338 time.sleep(0.5) 339 340 def cancel_running(self) -> dict: 341 """Cancel currently running executions of the same script.""" 342 logger.info("canceling running executions of this script") 343 344 jobs = self.get( 345 f"/w/{self.workspace}/jobs/list", 346 params={ 347 "running": "true", 348 "script_path_exact": self.path, 349 }, 350 ).json() 351 352 current_job_id = os.environ.get("WM_JOB_ID") 353 354 logger.debug(f"{current_job_id = }") 355 356 job_ids = [j["id"] for j in jobs if j["id"] != current_job_id] 357 358 if job_ids: 359 logger.info(f"cancelling the following job ids: {job_ids}") 360 else: 361 logger.info("no previous executions to cancel") 362 363 result = {} 364 365 for id_ in job_ids: 366 result[id_] = self.post( 367 f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}", 368 json={"reason": "killed by `cancel_running` method"}, 369 ) 370 371 return result 372 373 def get_job(self, job_id: str) -> dict: 374 return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json() 375 376 def get_root_job_id(self, job_id: str | None = None) -> dict: 377 job_id = job_id or os.environ.get("WM_JOB_ID") 378 return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json() 379 380 def get_id_token(self, audience: str) -> str: 381 return self.post(f"/w/{self.workspace}/oidc/token/{audience}").text 382 383 def get_job_status(self, job_id: str) -> JobStatus: 384 job = self.get_job(job_id) 385 job_type = job.get("type", "") 386 assert job_type, f"{job} is not a valid job" 387 if job_type.lower() == "completedjob": 388 return "COMPLETED" 389 if job.get("running"): 390 return "RUNNING" 391 return "WAITING" 392 393 def get_result( 394 self, 395 job_id: str, 396 assert_result_is_not_none: bool = True, 397 ) -> Any: 398 result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}") 399 result_text = result.text 400 if assert_result_is_not_none and result_text is None: 401 raise Exception(f"result is None for {job_id = }") 402 try: 403 return result.json() 404 except JSONDecodeError: 405 return result_text 406 407 def get_variable(self, path: str) -> str: 408 path = parse_variable_syntax(path) or path 409 if self.mocked_api is not None: 410 variables = self.mocked_api["variables"] 411 try: 412 result = variables[path] 413 return result 414 except KeyError: 415 logger.info( 416 f"MockedAPI present, but variable not found at {path}, falling back to real API" 417 ) 418 419 """Get variable from Windmill""" 420 return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json() 421 422 def set_variable(self, path: str, value: str, is_secret: bool = False) -> None: 423 path = parse_variable_syntax(path) or path 424 if self.mocked_api is not None: 425 self.mocked_api["variables"][path] = value 426 return 427 428 """Set variable from Windmill""" 429 # check if variable exists 430 r = self.get( 431 f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False 432 ) 433 if r.status_code == 404: 434 # create variable 435 self.post( 436 f"/w/{self.workspace}/variables/create", 437 json={ 438 "path": path, 439 "value": value, 440 "is_secret": is_secret, 441 "description": "", 442 }, 443 ) 444 else: 445 # update variable 446 self.post( 447 f"/w/{self.workspace}/variables/update/{path}", 448 json={"value": value}, 449 ) 450 451 def get_resource( 452 self, 453 path: str, 454 none_if_undefined: bool = False, 455 ) -> dict | None: 456 path = parse_resource_syntax(path) or path 457 if self.mocked_api is not None: 458 resources = self.mocked_api["resources"] 459 try: 460 result = resources[path] 461 return result 462 except KeyError: 463 # NOTE: should mocked_api respect `none_if_undefined`? 464 if none_if_undefined: 465 logger.info( 466 f"resource not found at ${path}, but none_if_undefined is True, so returning None" 467 ) 468 return None 469 logger.info( 470 f"MockedAPI present, but resource not found at ${path}, falling back to real API" 471 ) 472 473 """Get resource from Windmill""" 474 try: 475 return self.get( 476 f"/w/{self.workspace}/resources/get_value_interpolated/{path}" 477 ).json() 478 except Exception as e: 479 if none_if_undefined: 480 return None 481 logger.error(e) 482 raise e 483 484 def set_resource( 485 self, 486 value: Any, 487 path: str, 488 resource_type: str, 489 ): 490 path = parse_resource_syntax(path) or path 491 if self.mocked_api is not None: 492 self.mocked_api["resources"][path] = value 493 return 494 495 # check if resource exists 496 r = self.get( 497 f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False 498 ) 499 if r.status_code == 404: 500 # create resource 501 self.post( 502 f"/w/{self.workspace}/resources/create", 503 json={ 504 "path": path, 505 "value": value, 506 "resource_type": resource_type, 507 }, 508 ) 509 else: 510 # update resource 511 self.post( 512 f"/w/{self.workspace}/resources/update_value/{path}", 513 json={"value": value}, 514 ) 515 516 def set_state(self, value: Any): 517 self.set_resource(value, path=self.state_path, resource_type="state") 518 519 def set_progress(self, value: int, job_id: Optional[str] = None): 520 workspace = get_workspace() 521 flow_id = os.environ.get("WM_FLOW_JOB_ID") 522 job_id = job_id or os.environ.get("WM_JOB_ID") 523 524 if job_id != None: 525 job = self.get_job(job_id) 526 flow_id = job.get("parent_job") 527 528 self.post( 529 f"/w/{workspace}/job_metrics/set_progress/{job_id}", 530 json={ 531 "percent": value, 532 "flow_job_id": flow_id or None, 533 }, 534 ) 535 536 def get_progress(self, job_id: Optional[str] = None) -> Any: 537 workspace = get_workspace() 538 job_id = job_id or os.environ.get("WM_JOB_ID") 539 540 r = self.get( 541 f"/w/{workspace}/job_metrics/get_progress/{job_id}", 542 ) 543 if r.status_code == 404: 544 print(f"Job {job_id} does not exist") 545 return None 546 else: 547 return r.json() 548 549 def set_flow_user_state(self, key: str, value: Any) -> None: 550 """Set the user state of a flow at a given key""" 551 flow_id = self.get_root_job_id() 552 r = self.post( 553 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 554 json=value, 555 raise_for_status=False, 556 ) 557 if r.status_code == 404: 558 print(f"Job {flow_id} does not exist or is not a flow") 559 560 def get_flow_user_state(self, key: str) -> Any: 561 """Get the user state of a flow at a given key""" 562 flow_id = self.get_root_job_id() 563 r = self.get( 564 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 565 raise_for_status=False, 566 ) 567 if r.status_code == 404: 568 print(f"Job {flow_id} does not exist or is not a flow") 569 return None 570 else: 571 return r.json() 572 573 @property 574 def version(self): 575 return self.get("version").text 576 577 def get_duckdb_connection_settings( 578 self, 579 s3_resource_path: str = "", 580 ) -> DuckDbConnectionSettings | None: 581 """ 582 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 583 initiate an S3 connection from DuckDB 584 """ 585 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 586 try: 587 raw_obj = self.post( 588 f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings", 589 json={} 590 if s3_resource_path == "" 591 else {"s3_resource_path": s3_resource_path}, 592 ).json() 593 return DuckDbConnectionSettings(raw_obj) 594 except JSONDecodeError as e: 595 raise Exception( 596 "Could not generate DuckDB S3 connection settings from the provided resource" 597 ) from e 598 599 def get_polars_connection_settings( 600 self, 601 s3_resource_path: str = "", 602 ) -> PolarsConnectionSettings: 603 """ 604 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 605 initiate an S3 connection from Polars 606 """ 607 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 608 try: 609 raw_obj = self.post( 610 f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings", 611 json={} 612 if s3_resource_path == "" 613 else {"s3_resource_path": s3_resource_path}, 614 ).json() 615 return PolarsConnectionSettings(raw_obj) 616 except JSONDecodeError as e: 617 raise Exception( 618 "Could not generate Polars S3 connection settings from the provided resource" 619 ) from e 620 621 def get_boto3_connection_settings( 622 self, 623 s3_resource_path: str = "", 624 ) -> Boto3ConnectionSettings: 625 """ 626 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 627 initiate an S3 connection using boto3 628 """ 629 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 630 try: 631 s3_resource = self.post( 632 f"/w/{self.workspace}/job_helpers/v2/s3_resource_info", 633 json={} 634 if s3_resource_path == "" 635 else {"s3_resource_path": s3_resource_path}, 636 ).json() 637 return self.__boto3_connection_settings(s3_resource) 638 except JSONDecodeError as e: 639 raise Exception( 640 "Could not generate Boto3 S3 connection settings from the provided resource" 641 ) from e 642 643 def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes: 644 """ 645 Load a file from the workspace s3 bucket and returns its content as bytes. 646 647 '''python 648 from wmill import S3Object 649 650 s3_obj = S3Object(s3="/path/to/my_file.txt") 651 my_obj_content = client.load_s3_file(s3_obj) 652 file_content = my_obj_content.decode("utf-8") 653 ''' 654 """ 655 s3object = parse_s3_object(s3object) 656 with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 657 return file_reader.read() 658 659 def load_s3_file_reader( 660 self, s3object: S3Object | str, s3_resource_path: str | None 661 ) -> BufferedReader: 662 """ 663 Load a file from the workspace s3 bucket and returns the bytes stream. 664 665 '''python 666 from wmill import S3Object 667 668 s3_obj = S3Object(s3="/path/to/my_file.txt") 669 with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 670 print(file_reader.read()) 671 ''' 672 """ 673 s3object = parse_s3_object(s3object) 674 reader = S3BufferedReader( 675 f"{self.workspace}", 676 self.client, 677 s3object["s3"], 678 s3_resource_path, 679 s3object["storage"] if "storage" in s3object else None, 680 ) 681 return reader 682 683 def write_s3_file( 684 self, 685 s3object: S3Object | str | None, 686 file_content: BufferedReader | bytes, 687 s3_resource_path: str | None, 688 content_type: str | None = None, 689 content_disposition: str | None = None, 690 ) -> S3Object: 691 """ 692 Write a file to the workspace S3 bucket 693 694 '''python 695 from wmill import S3Object 696 697 s3_obj = S3Object(s3="/path/to/my_file.txt") 698 699 # for an in memory bytes array: 700 file_content = b'Hello Windmill!' 701 client.write_s3_file(s3_obj, file_content) 702 703 # for a file: 704 with open("my_file.txt", "rb") as my_file: 705 client.write_s3_file(s3_obj, my_file) 706 ''' 707 """ 708 s3object = parse_s3_object(s3object) 709 # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator 710 if isinstance(file_content, BufferedReader): 711 content_payload = bytes_generator(file_content) 712 elif isinstance(file_content, bytes): 713 content_payload = file_content 714 else: 715 raise Exception("Type of file_content not supported") 716 717 query_params = {} 718 if s3object is not None and s3object["s3"] != "": 719 query_params["file_key"] = s3object["s3"] 720 if s3_resource_path is not None and s3_resource_path != "": 721 query_params["s3_resource_path"] = s3_resource_path 722 if ( 723 s3object is not None 724 and "storage" in s3object 725 and s3object["storage"] is not None 726 ): 727 query_params["storage"] = s3object["storage"] 728 if content_type is not None: 729 query_params["content_type"] = content_type 730 if content_disposition is not None: 731 query_params["content_disposition"] = content_disposition 732 733 try: 734 # need a vanilla client b/c content-type is not application/json here 735 response = httpx.post( 736 f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file", 737 headers={ 738 "Authorization": f"Bearer {self.token}", 739 "Content-Type": "application/octet-stream", 740 }, 741 params=query_params, 742 content=content_payload, 743 verify=self.verify, 744 timeout=None, 745 ).json() 746 except Exception as e: 747 raise Exception("Could not write file to S3") from e 748 return S3Object(s3=response["file_key"]) 749 750 def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]: 751 return self.post( 752 f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))} 753 ).json() 754 755 def sign_s3_object(self, s3_object: S3Object | str) -> S3Object: 756 return self.post( 757 f"/w/{self.workspace}/apps/sign_s3_objects", 758 json={"s3_objects": [s3_object]}, 759 ).json()[0] 760 761 def __boto3_connection_settings(self, s3_resource) -> Boto3ConnectionSettings: 762 endpoint_url_prefix = "https://" if s3_resource["useSSL"] else "http://" 763 return Boto3ConnectionSettings( 764 { 765 "endpoint_url": "{}{}".format( 766 endpoint_url_prefix, s3_resource["endPoint"] 767 ), 768 "region_name": s3_resource["region"], 769 "use_ssl": s3_resource["useSSL"], 770 "aws_access_key_id": s3_resource["accessKey"], 771 "aws_secret_access_key": s3_resource["secretKey"], 772 # no need for path_style here as boto3 is clever enough to determine which one to use 773 } 774 ) 775 776 def whoami(self) -> dict: 777 return self.get("/users/whoami").json() 778 779 @property 780 def user(self) -> dict: 781 return self.whoami() 782 783 @property 784 def state_path(self) -> str: 785 state_path = os.environ.get( 786 "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH") 787 ) 788 if state_path is None: 789 raise Exception("State path not found") 790 return state_path 791 792 @property 793 def state(self) -> Any: 794 return self.get_resource(path=self.state_path, none_if_undefined=True) 795 796 @state.setter 797 def state(self, value: Any) -> None: 798 self.set_state(value) 799 800 @staticmethod 801 def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None: 802 """ 803 Set the state in the shared folder using pickle 804 """ 805 import pickle 806 807 with open(f"/shared/{path}", "wb") as handle: 808 pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL) 809 810 @staticmethod 811 def get_shared_state_pickle(path: str = "state.pickle") -> Any: 812 """ 813 Get the state in the shared folder using pickle 814 """ 815 import pickle 816 817 with open(f"/shared/{path}", "rb") as handle: 818 return pickle.load(handle) 819 820 @staticmethod 821 def set_shared_state(value: Any, path: str = "state.json") -> None: 822 """ 823 Set the state in the shared folder using pickle 824 """ 825 import json 826 827 with open(f"/shared/{path}", "w", encoding="utf-8") as f: 828 json.dump(value, f, ensure_ascii=False, indent=4) 829 830 @staticmethod 831 def get_shared_state(path: str = "state.json") -> None: 832 """ 833 Get the state in the shared folder using pickle 834 """ 835 import json 836 837 with open(f"/shared/{path}", "r", encoding="utf-8") as f: 838 return json.load(f) 839 840 def get_resume_urls(self, approver: str = None) -> dict: 841 nonce = random.randint(0, 1000000000) 842 job_id = os.environ.get("WM_JOB_ID") or "NO_ID" 843 return self.get( 844 f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}", 845 params={"approver": approver}, 846 ).json() 847 848 def request_interactive_slack_approval( 849 self, 850 slack_resource_path: str, 851 channel_id: str, 852 message: str = None, 853 approver: str = None, 854 default_args_json: dict = None, 855 dynamic_enums_json: dict = None, 856 ) -> None: 857 """ 858 Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields. 859 860 **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. 861 Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form 862 863 :param slack_resource_path: The path to the Slack resource in Windmill. 864 :type slack_resource_path: str 865 :param channel_id: The Slack channel ID where the approval request will be sent. 866 :type channel_id: str 867 :param message: Optional custom message to include in the Slack approval request. 868 :type message: str, optional 869 :param approver: Optional user ID or name of the approver for the request. 870 :type approver: str, optional 871 :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields. 872 :type default_args_json: dict, optional 873 :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields. 874 :type dynamic_enums_json: dict, optional 875 876 :raises Exception: If the function is not called within a flow or flow preview. 877 :raises Exception: If the required flow job or flow step environment variables are not set. 878 879 :return: None 880 881 **Usage Example:** 882 >>> client.request_interactive_slack_approval( 883 ... slack_resource_path="/u/alex/my_slack_resource", 884 ... channel_id="admins-slack-channel", 885 ... message="Please approve this request", 886 ... approver="approver123", 887 ... default_args_json={"key1": "value1", "key2": 42}, 888 ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, 889 ... ) 890 891 **Notes:** 892 - This function must be executed within a Windmill flow or flow preview. 893 - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context. 894 """ 895 workspace = self.workspace 896 flow_job_id = os.environ.get("WM_FLOW_JOB_ID") 897 898 if not flow_job_id: 899 raise Exception( 900 "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview." 901 ) 902 903 # Only include non-empty parameters 904 params = {} 905 if message: 906 params["message"] = message 907 if approver: 908 params["approver"] = approver 909 if slack_resource_path: 910 params["slack_resource_path"] = slack_resource_path 911 if channel_id: 912 params["channel_id"] = channel_id 913 if os.environ.get("WM_FLOW_STEP_ID"): 914 params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID") 915 if default_args_json: 916 params["default_args_json"] = json.dumps(default_args_json) 917 if dynamic_enums_json: 918 params["dynamic_enums_json"] = json.dumps(dynamic_enums_json) 919 920 self.get( 921 f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}", 922 params=params, 923 ) 924 925 def username_to_email(self, username: str) -> str: 926 """ 927 Get email from workspace username 928 This method is particularly useful for apps that require the email address of the viewer. 929 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 930 """ 931 return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text 932 933 def send_teams_message( 934 self, 935 conversation_id: str, 936 text: str, 937 success: bool = True, 938 card_block: dict = None, 939 ): 940 """ 941 Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message 942 """ 943 return self.post( 944 f"/teams/activities", 945 json={ 946 "conversation_id": conversation_id, 947 "text": text, 948 "success": success, 949 "card_block": card_block, 950 }, 951 ) 952 953 954def init_global_client(f): 955 @functools.wraps(f) 956 def wrapper(*args, **kwargs): 957 global _client 958 if _client is None: 959 _client = Windmill() 960 return f(*args, **kwargs) 961 962 return wrapper 963 964 965def deprecate(in_favor_of: str): 966 def decorator(f): 967 @functools.wraps(f) 968 def wrapper(*args, **kwargs): 969 warnings.warn( 970 ( 971 f"The '{f.__name__}' method is deprecated and may be removed in the future. " 972 f"Consider {in_favor_of}" 973 ), 974 DeprecationWarning, 975 ) 976 return f(*args, **kwargs) 977 978 return wrapper 979 980 return decorator 981 982 983@init_global_client 984def get_workspace() -> str: 985 return _client.workspace 986 987 988@init_global_client 989def get_root_job_id(job_id: str | None = None) -> str: 990 return _client.get_root_job_id(job_id) 991 992 993@init_global_client 994@deprecate("Windmill().version") 995def get_version() -> str: 996 return _client.version 997 998 999@init_global_client 1000def run_script_async( 1001 hash_or_path: str, 1002 args: Dict[str, Any] = None, 1003 scheduled_in_secs: int = None, 1004) -> str: 1005 is_path = "/" in hash_or_path 1006 hash_ = None if is_path else hash_or_path 1007 path = hash_or_path if is_path else None 1008 return _client.run_script_async( 1009 hash_=hash_, 1010 path=path, 1011 args=args, 1012 scheduled_in_secs=scheduled_in_secs, 1013 ) 1014 1015 1016@init_global_client 1017def run_flow_async( 1018 path: str, 1019 args: Dict[str, Any] = None, 1020 scheduled_in_secs: int = None, 1021 # can only be set to false if this the job will be fully await and not concurrent with any other job 1022 # as otherwise the child flow and its own child will store their state in the parent job which will 1023 # lead to incorrectness and failures 1024 do_not_track_in_parent: bool = True, 1025) -> str: 1026 return _client.run_flow_async( 1027 path=path, 1028 args=args, 1029 scheduled_in_secs=scheduled_in_secs, 1030 do_not_track_in_parent=do_not_track_in_parent, 1031 ) 1032 1033 1034@init_global_client 1035def run_script_sync( 1036 hash: str, 1037 args: Dict[str, Any] = None, 1038 verbose: bool = False, 1039 assert_result_is_not_none: bool = True, 1040 cleanup: bool = True, 1041 timeout: dt.timedelta = None, 1042) -> Any: 1043 return _client.run_script( 1044 hash_=hash, 1045 args=args, 1046 verbose=verbose, 1047 assert_result_is_not_none=assert_result_is_not_none, 1048 cleanup=cleanup, 1049 timeout=timeout, 1050 ) 1051 1052 1053@init_global_client 1054def run_script_by_path_async( 1055 path: str, 1056 args: Dict[str, Any] = None, 1057 scheduled_in_secs: Union[None, int] = None, 1058) -> str: 1059 return _client.run_script_by_path_async( 1060 path=path, 1061 args=args, 1062 scheduled_in_secs=scheduled_in_secs, 1063 ) 1064 1065 1066@init_global_client 1067def run_script_by_hash_async( 1068 hash_: str, 1069 args: Dict[str, Any] = None, 1070 scheduled_in_secs: Union[None, int] = None, 1071) -> str: 1072 return _client.run_script_by_hash_async( 1073 hash_=hash_, 1074 args=args, 1075 scheduled_in_secs=scheduled_in_secs, 1076 ) 1077 1078 1079@init_global_client 1080def run_script_by_path_sync( 1081 path: str, 1082 args: Dict[str, Any] = None, 1083 verbose: bool = False, 1084 assert_result_is_not_none: bool = True, 1085 cleanup: bool = True, 1086 timeout: dt.timedelta = None, 1087) -> Any: 1088 return _client.run_script( 1089 path=path, 1090 args=args, 1091 verbose=verbose, 1092 assert_result_is_not_none=assert_result_is_not_none, 1093 cleanup=cleanup, 1094 timeout=timeout, 1095 ) 1096 1097 1098@init_global_client 1099def get_id_token(audience: str) -> str: 1100 """ 1101 Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc. 1102 """ 1103 return _client.get_id_token(audience) 1104 1105 1106@init_global_client 1107def get_job_status(job_id: str) -> JobStatus: 1108 return _client.get_job_status(job_id) 1109 1110 1111@init_global_client 1112def get_result(job_id: str, assert_result_is_not_none=True) -> Dict[str, Any]: 1113 return _client.get_result( 1114 job_id=job_id, assert_result_is_not_none=assert_result_is_not_none 1115 ) 1116 1117 1118@init_global_client 1119def duckdb_connection_settings(s3_resource_path: str = "") -> DuckDbConnectionSettings: 1120 """ 1121 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1122 initiate an S3 connection from DuckDB 1123 """ 1124 return _client.get_duckdb_connection_settings(s3_resource_path) 1125 1126 1127@init_global_client 1128def polars_connection_settings(s3_resource_path: str = "") -> PolarsConnectionSettings: 1129 """ 1130 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1131 initiate an S3 connection from Polars 1132 """ 1133 return _client.get_polars_connection_settings(s3_resource_path) 1134 1135 1136@init_global_client 1137def boto3_connection_settings(s3_resource_path: str = "") -> Boto3ConnectionSettings: 1138 """ 1139 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1140 initiate an S3 connection using boto3 1141 """ 1142 return _client.get_boto3_connection_settings(s3_resource_path) 1143 1144 1145@init_global_client 1146def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None = None) -> bytes: 1147 """ 1148 Load the entire content of a file stored in S3 as bytes 1149 """ 1150 return _client.load_s3_file( 1151 s3object, s3_resource_path if s3_resource_path != "" else None 1152 ) 1153 1154 1155@init_global_client 1156def load_s3_file_reader( 1157 s3object: S3Object | str, s3_resource_path: str | None = None 1158) -> BufferedReader: 1159 """ 1160 Load the content of a file stored in S3 1161 """ 1162 return _client.load_s3_file_reader( 1163 s3object, s3_resource_path if s3_resource_path != "" else None 1164 ) 1165 1166 1167@init_global_client 1168def write_s3_file( 1169 s3object: S3Object | str | None, 1170 file_content: BufferedReader | bytes, 1171 s3_resource_path: str | None = None, 1172 content_type: str | None = None, 1173 content_disposition: str | None = None, 1174) -> S3Object: 1175 """ 1176 Upload a file to S3 1177 1178 Content type will be automatically guessed from path extension if left empty 1179 1180 See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition 1181 and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type 1182 1183 """ 1184 return _client.write_s3_file( 1185 s3object, 1186 file_content, 1187 s3_resource_path if s3_resource_path != "" else None, 1188 content_type, 1189 content_disposition, 1190 ) 1191 1192 1193@init_global_client 1194def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]: 1195 """ 1196 Sign S3 objects to be used by anonymous users in public apps 1197 Returns a list of signed s3 tokens 1198 """ 1199 return _client.sign_s3_objects(s3_objects) 1200 1201 1202@init_global_client 1203def sign_s3_object(s3_object: S3Object| str) -> S3Object: 1204 """ 1205 Sign S3 object to be used by anonymous users in public apps 1206 Returns a signed s3 object 1207 """ 1208 return _client.sign_s3_object(s3_object) 1209 1210 1211@init_global_client 1212def whoami() -> dict: 1213 """ 1214 Returns the current user 1215 """ 1216 return _client.user 1217 1218 1219@init_global_client 1220@deprecate("Windmill().state") 1221def get_state() -> Any: 1222 """ 1223 Get the state 1224 """ 1225 return _client.state 1226 1227 1228@init_global_client 1229def get_resource( 1230 path: str, 1231 none_if_undefined: bool = False, 1232) -> dict | None: 1233 """Get resource from Windmill""" 1234 return _client.get_resource(path, none_if_undefined) 1235 1236 1237@init_global_client 1238def set_resource(path: str, value: Any, resource_type: str = "any") -> None: 1239 """ 1240 Set the resource at a given path as a string, creating it if it does not exist 1241 """ 1242 return _client.set_resource(value=value, path=path, resource_type=resource_type) 1243 1244 1245@init_global_client 1246def set_state(value: Any) -> None: 1247 """ 1248 Set the state 1249 """ 1250 return _client.set_state(value) 1251 1252 1253@init_global_client 1254def set_progress(value: int, job_id: Optional[str] = None) -> None: 1255 """ 1256 Set the progress 1257 """ 1258 return _client.set_progress(value, job_id) 1259 1260 1261@init_global_client 1262def get_progress(job_id: Optional[str] = None) -> Any: 1263 """ 1264 Get the progress 1265 """ 1266 return _client.get_progress(job_id) 1267 1268 1269def set_shared_state_pickle(value: Any, path="state.pickle") -> None: 1270 """ 1271 Set the state in the shared folder using pickle 1272 """ 1273 return Windmill.set_shared_state_pickle(value=value, path=path) 1274 1275 1276@deprecate("Windmill.get_shared_state_pickle(...)") 1277def get_shared_state_pickle(path="state.pickle") -> Any: 1278 """ 1279 Get the state in the shared folder using pickle 1280 """ 1281 return Windmill.get_shared_state_pickle(path=path) 1282 1283 1284def set_shared_state(value: Any, path="state.json") -> None: 1285 """ 1286 Set the state in the shared folder using pickle 1287 """ 1288 return Windmill.set_shared_state(value=value, path=path) 1289 1290 1291def get_shared_state(path="state.json") -> None: 1292 """ 1293 Get the state in the shared folder using pickle 1294 """ 1295 return Windmill.get_shared_state(path=path) 1296 1297 1298@init_global_client 1299def get_variable(path: str) -> str: 1300 """ 1301 Returns the variable at a given path as a string 1302 """ 1303 return _client.get_variable(path) 1304 1305 1306@init_global_client 1307def set_variable(path: str, value: str, is_secret: bool = False) -> None: 1308 """ 1309 Set the variable at a given path as a string, creating it if it does not exist 1310 """ 1311 return _client.set_variable(path, value, is_secret) 1312 1313 1314@init_global_client 1315def get_flow_user_state(key: str) -> Any: 1316 """ 1317 Get the user state of a flow at a given key 1318 """ 1319 return _client.get_flow_user_state(key) 1320 1321 1322@init_global_client 1323def set_flow_user_state(key: str, value: Any) -> None: 1324 """ 1325 Set the user state of a flow at a given key 1326 """ 1327 return _client.set_flow_user_state(key, value) 1328 1329 1330@init_global_client 1331def get_state_path() -> str: 1332 return _client.state_path 1333 1334 1335@init_global_client 1336def get_resume_urls(approver: str = None) -> dict: 1337 return _client.get_resume_urls(approver) 1338 1339 1340@init_global_client 1341def request_interactive_slack_approval( 1342 slack_resource_path: str, 1343 channel_id: str, 1344 message: str = None, 1345 approver: str = None, 1346 default_args_json: dict = None, 1347 dynamic_enums_json: dict = None, 1348) -> None: 1349 return _client.request_interactive_slack_approval( 1350 slack_resource_path=slack_resource_path, 1351 channel_id=channel_id, 1352 message=message, 1353 approver=approver, 1354 default_args_json=default_args_json, 1355 dynamic_enums_json=dynamic_enums_json, 1356 ) 1357 1358 1359@init_global_client 1360def send_teams_message( 1361 conversation_id: str, text: str, success: bool, card_block: dict = None 1362): 1363 return _client.send_teams_message(conversation_id, text, success, card_block) 1364 1365 1366@init_global_client 1367def cancel_running() -> dict: 1368 """Cancel currently running executions of the same script.""" 1369 return _client.cancel_running() 1370 1371 1372@init_global_client 1373def run_script( 1374 path: str = None, 1375 hash_: str = None, 1376 args: dict = None, 1377 timeout: dt.timedelta | int | float = None, 1378 verbose: bool = False, 1379 cleanup: bool = True, 1380 assert_result_is_not_none: bool = True, 1381) -> Any: 1382 """Run script synchronously and return its result. 1383 1384 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 1385 """ 1386 return _client.run_script( 1387 path=path, 1388 hash_=hash_, 1389 args=args, 1390 verbose=verbose, 1391 assert_result_is_not_none=assert_result_is_not_none, 1392 cleanup=cleanup, 1393 timeout=timeout, 1394 ) 1395 1396 1397@init_global_client 1398def run_script_by_path( 1399 path: str, 1400 args: dict = None, 1401 timeout: dt.timedelta | int | float = None, 1402 verbose: bool = False, 1403 cleanup: bool = True, 1404 assert_result_is_not_none: bool = True, 1405) -> Any: 1406 """Run script by path synchronously and return its result.""" 1407 return _client.run_script_by_path( 1408 path=path, 1409 args=args, 1410 verbose=verbose, 1411 assert_result_is_not_none=assert_result_is_not_none, 1412 cleanup=cleanup, 1413 timeout=timeout, 1414 ) 1415 1416 1417@init_global_client 1418def run_script_by_hash( 1419 hash_: str, 1420 args: dict = None, 1421 timeout: dt.timedelta | int | float = None, 1422 verbose: bool = False, 1423 cleanup: bool = True, 1424 assert_result_is_not_none: bool = True, 1425) -> Any: 1426 """Run script by hash synchronously and return its result.""" 1427 return _client.run_script_by_hash( 1428 hash_=hash_, 1429 args=args, 1430 verbose=verbose, 1431 assert_result_is_not_none=assert_result_is_not_none, 1432 cleanup=cleanup, 1433 timeout=timeout, 1434 ) 1435 1436 1437@init_global_client 1438def username_to_email(username: str) -> str: 1439 """ 1440 Get email from workspace username 1441 This method is particularly useful for apps that require the email address of the viewer. 1442 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 1443 """ 1444 return _client.username_to_email(username) 1445 1446 1447def task(*args, **kwargs): 1448 from inspect import signature 1449 1450 def f(func, tag: str | None = None): 1451 if ( 1452 os.environ.get("WM_JOB_ID") is None 1453 or os.environ.get("MAIN_OVERRIDE") == func.__name__ 1454 ): 1455 1456 def inner(*args, **kwargs): 1457 return func(*args, **kwargs) 1458 1459 return inner 1460 else: 1461 1462 def inner(*args, **kwargs): 1463 global _client 1464 if _client is None: 1465 _client = Windmill() 1466 w_id = os.environ.get("WM_WORKSPACE") 1467 job_id = os.environ.get("WM_JOB_ID") 1468 f_name = func.__name__ 1469 json = kwargs 1470 params = list(signature(func).parameters) 1471 for i, arg in enumerate(args): 1472 if i < len(params): 1473 p = params[i] 1474 key = p 1475 if key not in kwargs: 1476 json[key] = arg 1477 1478 params = {} 1479 if tag is not None: 1480 params["tag"] = tag 1481 w_as_code_response = _client.post( 1482 f"/w/{w_id}/jobs/run/workflow_as_code/{job_id}/{f_name}", 1483 json={"args": json}, 1484 params=params, 1485 ) 1486 job_id = w_as_code_response.text 1487 print(f"Executing task {func.__name__} on job {job_id}") 1488 job_result = _client.wait_job(job_id) 1489 print(f"Task {func.__name__} ({job_id}) completed") 1490 return job_result 1491 1492 return inner 1493 1494 if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): 1495 return f(args[0], None) 1496 else: 1497 return lambda x: f(x, kwargs.get("tag")) 1498 1499def parse_resource_syntax(s: str) -> Optional[str]: 1500 """Parse resource syntax from string.""" 1501 if s is None: 1502 return None 1503 if s.startswith("$res:"): 1504 return s[5:] 1505 if s.startswith("res://"): 1506 return s[6:] 1507 return None 1508 1509def parse_s3_object(s3_object: S3Object | str) -> S3Object: 1510 """Parse S3 object from string or S3Object format.""" 1511 if isinstance(s3_object, str): 1512 match = re.match(r'^s3://([^/]*)/(.*)$', s3_object) 1513 if match: 1514 return S3Object(s3=match.group(2) or "", storage=match.group(1) or None) 1515 return S3Object(s3="") 1516 else: 1517 return s3_object 1518 1519 1520 1521def parse_variable_syntax(s: str) -> Optional[str]: 1522 """Parse variable syntax from string.""" 1523 if s.startswith("var://"): 1524 return s[6:] 1525 return None 1526 1527 1528def append_to_result_stream(text: str) -> None: 1529 """Append a text to the result stream. 1530 1531 Args: 1532 text: text to append to the result stream 1533 """ 1534 print("WM_STREAM: {}".format(text.replace(chr(10), '\\n'))) 1535 1536def stream_result(stream) -> None: 1537 """Stream to the result stream. 1538 1539 Args: 1540 stream: stream to stream to the result stream 1541 """ 1542 for text in stream: 1543 append_to_result_stream(text)
35class Windmill: 36 def __init__(self, base_url=None, token=None, workspace=None, verify=True): 37 base = ( 38 base_url 39 or os.environ.get("BASE_INTERNAL_URL") 40 or os.environ.get("WM_BASE_URL") 41 ) 42 43 self.base_url = f"{base}/api" 44 self.token = token or os.environ.get("WM_TOKEN") 45 self.headers = { 46 "Content-Type": "application/json", 47 "Authorization": f"Bearer {self.token}", 48 } 49 self.verify = verify 50 self.client = self.get_client() 51 self.workspace = workspace or os.environ.get("WM_WORKSPACE") 52 self.path = os.environ.get("WM_JOB_PATH") 53 54 self.mocked_api = self.get_mocked_api() 55 56 assert self.workspace, ( 57 f"workspace required as an argument or as WM_WORKSPACE environment variable" 58 ) 59 60 def get_mocked_api(self) -> Optional[dict]: 61 mocked_path = os.environ.get("WM_MOCKED_API_FILE") 62 if not mocked_path: 63 return None 64 logger.info("Using mocked API from %s", mocked_path) 65 mocked_api = {"variables": {}, "resources": {}} 66 try: 67 with open(mocked_path, "r") as f: 68 incoming_mocked_api = json.load(f) 69 mocked_api = {**mocked_api, **incoming_mocked_api} 70 except Exception as e: 71 logger.warning( 72 "Error parsing mocked API file at path %s Using empty mocked API.", 73 mocked_path, 74 ) 75 logger.debug(e) 76 return mocked_api 77 78 def get_client(self) -> httpx.Client: 79 return httpx.Client( 80 base_url=self.base_url, 81 headers=self.headers, 82 verify=self.verify, 83 ) 84 85 def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 86 endpoint = endpoint.lstrip("/") 87 resp = self.client.get(f"/{endpoint}", **kwargs) 88 if raise_for_status: 89 try: 90 resp.raise_for_status() 91 except httpx.HTTPStatusError as err: 92 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 93 logger.error(error) 94 raise Exception(error) 95 return resp 96 97 def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 98 endpoint = endpoint.lstrip("/") 99 resp = self.client.post(f"/{endpoint}", **kwargs) 100 if raise_for_status: 101 try: 102 resp.raise_for_status() 103 except httpx.HTTPStatusError as err: 104 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 105 logger.error(error) 106 raise Exception(error) 107 return resp 108 109 def create_token(self, duration=dt.timedelta(days=1)) -> str: 110 endpoint = "/users/tokens/create" 111 payload = { 112 "label": f"refresh {time.time()}", 113 "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"), 114 } 115 return self.post(endpoint, json=payload).text 116 117 def run_script_async( 118 self, 119 path: str = None, 120 hash_: str = None, 121 args: dict = None, 122 scheduled_in_secs: int = None, 123 ) -> str: 124 """Create a script job and return its job id. 125 126 .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead. 127 """ 128 logging.warning( 129 "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.", 130 ) 131 assert not (path and hash_), "path and hash_ are mutually exclusive" 132 return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs) 133 134 def _run_script_async_internal( 135 self, 136 path: str = None, 137 hash_: str = None, 138 args: dict = None, 139 scheduled_in_secs: int = None, 140 ) -> str: 141 """Internal helper for running scripts asynchronously.""" 142 args = args or {} 143 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 144 if os.environ.get("WM_JOB_ID"): 145 params["parent_job"] = os.environ.get("WM_JOB_ID") 146 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 147 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 148 149 if path: 150 endpoint = f"/w/{self.workspace}/jobs/run/p/{path}" 151 elif hash_: 152 endpoint = f"/w/{self.workspace}/jobs/run/h/{hash_}" 153 else: 154 raise Exception("path or hash_ must be provided") 155 156 return self.post(endpoint, json=args, params=params).text 157 158 def run_script_by_path_async( 159 self, 160 path: str, 161 args: dict = None, 162 scheduled_in_secs: int = None, 163 ) -> str: 164 """Create a script job by path and return its job id.""" 165 return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs) 166 167 def run_script_by_hash_async( 168 self, 169 hash_: str, 170 args: dict = None, 171 scheduled_in_secs: int = None, 172 ) -> str: 173 """Create a script job by hash and return its job id.""" 174 return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs) 175 176 def run_flow_async( 177 self, 178 path: str, 179 args: dict = None, 180 scheduled_in_secs: int = None, 181 # can only be set to false if this the job will be fully await and not concurrent with any other job 182 # as otherwise the child flow and its own child will store their state in the parent job which will 183 # lead to incorrectness and failures 184 do_not_track_in_parent: bool = True, 185 ) -> str: 186 """Create a flow job and return its job id.""" 187 args = args or {} 188 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 189 if not do_not_track_in_parent: 190 if os.environ.get("WM_JOB_ID"): 191 params["parent_job"] = os.environ.get("WM_JOB_ID") 192 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 193 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 194 if path: 195 endpoint = f"/w/{self.workspace}/jobs/run/f/{path}" 196 else: 197 raise Exception("path must be provided") 198 return self.post(endpoint, json=args, params=params).text 199 200 def run_script( 201 self, 202 path: str = None, 203 hash_: str = None, 204 args: dict = None, 205 timeout: dt.timedelta | int | float | None = None, 206 verbose: bool = False, 207 cleanup: bool = True, 208 assert_result_is_not_none: bool = False, 209 ) -> Any: 210 """Run script synchronously and return its result. 211 212 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 213 """ 214 logging.warning( 215 "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.", 216 ) 217 assert not (path and hash_), "path and hash_ are mutually exclusive" 218 return self._run_script_internal( 219 path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose, 220 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 221 ) 222 223 def _run_script_internal( 224 self, 225 path: str = None, 226 hash_: str = None, 227 args: dict = None, 228 timeout: dt.timedelta | int | float | None = None, 229 verbose: bool = False, 230 cleanup: bool = True, 231 assert_result_is_not_none: bool = False, 232 ) -> Any: 233 """Internal helper for running scripts synchronously.""" 234 args = args or {} 235 236 if verbose: 237 if path: 238 logger.info(f"running `{path}` synchronously with {args = }") 239 elif hash_: 240 logger.info(f"running script with hash `{hash_}` synchronously with {args = }") 241 242 if isinstance(timeout, dt.timedelta): 243 timeout = timeout.total_seconds() 244 245 job_id = self._run_script_async_internal(path=path, hash_=hash_, args=args) 246 return self.wait_job( 247 job_id, timeout, verbose, cleanup, assert_result_is_not_none 248 ) 249 250 def run_script_by_path( 251 self, 252 path: str, 253 args: dict = None, 254 timeout: dt.timedelta | int | float | None = None, 255 verbose: bool = False, 256 cleanup: bool = True, 257 assert_result_is_not_none: bool = False, 258 ) -> Any: 259 """Run script by path synchronously and return its result.""" 260 return self._run_script_internal( 261 path=path, args=args, timeout=timeout, verbose=verbose, 262 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 263 ) 264 265 def run_script_by_hash( 266 self, 267 hash_: str, 268 args: dict = None, 269 timeout: dt.timedelta | int | float | None = None, 270 verbose: bool = False, 271 cleanup: bool = True, 272 assert_result_is_not_none: bool = False, 273 ) -> Any: 274 """Run script by hash synchronously and return its result.""" 275 return self._run_script_internal( 276 hash_=hash_, args=args, timeout=timeout, verbose=verbose, 277 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 278 ) 279 280 def wait_job( 281 self, 282 job_id, 283 timeout: dt.timedelta | int | float | None = None, 284 verbose: bool = False, 285 cleanup: bool = True, 286 assert_result_is_not_none: bool = False, 287 ): 288 def cancel_job(): 289 logger.warning(f"cancelling job: {job_id}") 290 self.post( 291 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 292 json={"reason": "parent script cancelled"}, 293 ).raise_for_status() 294 295 if cleanup: 296 atexit.register(cancel_job) 297 298 start_time = time.time() 299 300 if isinstance(timeout, dt.timedelta): 301 timeout = timeout.total_seconds() 302 303 while True: 304 result_res = self.get( 305 f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True 306 ).json() 307 308 started = result_res["started"] 309 completed = result_res["completed"] 310 success = result_res["success"] 311 312 if not started and verbose: 313 logger.info(f"job {job_id} has not started yet") 314 315 if cleanup and completed: 316 atexit.unregister(cancel_job) 317 318 if completed: 319 result = result_res["result"] 320 if success: 321 if result is None and assert_result_is_not_none: 322 raise Exception("Result was none") 323 return result 324 else: 325 error = result["error"] 326 raise Exception(f"Job {job_id} was not successful: {str(error)}") 327 328 if timeout and ((time.time() - start_time) > timeout): 329 msg = "reached timeout" 330 logger.warning(msg) 331 self.post( 332 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 333 json={"reason": msg}, 334 ) 335 raise TimeoutError(msg) 336 if verbose: 337 logger.info(f"sleeping 0.5 seconds for {job_id = }") 338 339 time.sleep(0.5) 340 341 def cancel_running(self) -> dict: 342 """Cancel currently running executions of the same script.""" 343 logger.info("canceling running executions of this script") 344 345 jobs = self.get( 346 f"/w/{self.workspace}/jobs/list", 347 params={ 348 "running": "true", 349 "script_path_exact": self.path, 350 }, 351 ).json() 352 353 current_job_id = os.environ.get("WM_JOB_ID") 354 355 logger.debug(f"{current_job_id = }") 356 357 job_ids = [j["id"] for j in jobs if j["id"] != current_job_id] 358 359 if job_ids: 360 logger.info(f"cancelling the following job ids: {job_ids}") 361 else: 362 logger.info("no previous executions to cancel") 363 364 result = {} 365 366 for id_ in job_ids: 367 result[id_] = self.post( 368 f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}", 369 json={"reason": "killed by `cancel_running` method"}, 370 ) 371 372 return result 373 374 def get_job(self, job_id: str) -> dict: 375 return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json() 376 377 def get_root_job_id(self, job_id: str | None = None) -> dict: 378 job_id = job_id or os.environ.get("WM_JOB_ID") 379 return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json() 380 381 def get_id_token(self, audience: str) -> str: 382 return self.post(f"/w/{self.workspace}/oidc/token/{audience}").text 383 384 def get_job_status(self, job_id: str) -> JobStatus: 385 job = self.get_job(job_id) 386 job_type = job.get("type", "") 387 assert job_type, f"{job} is not a valid job" 388 if job_type.lower() == "completedjob": 389 return "COMPLETED" 390 if job.get("running"): 391 return "RUNNING" 392 return "WAITING" 393 394 def get_result( 395 self, 396 job_id: str, 397 assert_result_is_not_none: bool = True, 398 ) -> Any: 399 result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}") 400 result_text = result.text 401 if assert_result_is_not_none and result_text is None: 402 raise Exception(f"result is None for {job_id = }") 403 try: 404 return result.json() 405 except JSONDecodeError: 406 return result_text 407 408 def get_variable(self, path: str) -> str: 409 path = parse_variable_syntax(path) or path 410 if self.mocked_api is not None: 411 variables = self.mocked_api["variables"] 412 try: 413 result = variables[path] 414 return result 415 except KeyError: 416 logger.info( 417 f"MockedAPI present, but variable not found at {path}, falling back to real API" 418 ) 419 420 """Get variable from Windmill""" 421 return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json() 422 423 def set_variable(self, path: str, value: str, is_secret: bool = False) -> None: 424 path = parse_variable_syntax(path) or path 425 if self.mocked_api is not None: 426 self.mocked_api["variables"][path] = value 427 return 428 429 """Set variable from Windmill""" 430 # check if variable exists 431 r = self.get( 432 f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False 433 ) 434 if r.status_code == 404: 435 # create variable 436 self.post( 437 f"/w/{self.workspace}/variables/create", 438 json={ 439 "path": path, 440 "value": value, 441 "is_secret": is_secret, 442 "description": "", 443 }, 444 ) 445 else: 446 # update variable 447 self.post( 448 f"/w/{self.workspace}/variables/update/{path}", 449 json={"value": value}, 450 ) 451 452 def get_resource( 453 self, 454 path: str, 455 none_if_undefined: bool = False, 456 ) -> dict | None: 457 path = parse_resource_syntax(path) or path 458 if self.mocked_api is not None: 459 resources = self.mocked_api["resources"] 460 try: 461 result = resources[path] 462 return result 463 except KeyError: 464 # NOTE: should mocked_api respect `none_if_undefined`? 465 if none_if_undefined: 466 logger.info( 467 f"resource not found at ${path}, but none_if_undefined is True, so returning None" 468 ) 469 return None 470 logger.info( 471 f"MockedAPI present, but resource not found at ${path}, falling back to real API" 472 ) 473 474 """Get resource from Windmill""" 475 try: 476 return self.get( 477 f"/w/{self.workspace}/resources/get_value_interpolated/{path}" 478 ).json() 479 except Exception as e: 480 if none_if_undefined: 481 return None 482 logger.error(e) 483 raise e 484 485 def set_resource( 486 self, 487 value: Any, 488 path: str, 489 resource_type: str, 490 ): 491 path = parse_resource_syntax(path) or path 492 if self.mocked_api is not None: 493 self.mocked_api["resources"][path] = value 494 return 495 496 # check if resource exists 497 r = self.get( 498 f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False 499 ) 500 if r.status_code == 404: 501 # create resource 502 self.post( 503 f"/w/{self.workspace}/resources/create", 504 json={ 505 "path": path, 506 "value": value, 507 "resource_type": resource_type, 508 }, 509 ) 510 else: 511 # update resource 512 self.post( 513 f"/w/{self.workspace}/resources/update_value/{path}", 514 json={"value": value}, 515 ) 516 517 def set_state(self, value: Any): 518 self.set_resource(value, path=self.state_path, resource_type="state") 519 520 def set_progress(self, value: int, job_id: Optional[str] = None): 521 workspace = get_workspace() 522 flow_id = os.environ.get("WM_FLOW_JOB_ID") 523 job_id = job_id or os.environ.get("WM_JOB_ID") 524 525 if job_id != None: 526 job = self.get_job(job_id) 527 flow_id = job.get("parent_job") 528 529 self.post( 530 f"/w/{workspace}/job_metrics/set_progress/{job_id}", 531 json={ 532 "percent": value, 533 "flow_job_id": flow_id or None, 534 }, 535 ) 536 537 def get_progress(self, job_id: Optional[str] = None) -> Any: 538 workspace = get_workspace() 539 job_id = job_id or os.environ.get("WM_JOB_ID") 540 541 r = self.get( 542 f"/w/{workspace}/job_metrics/get_progress/{job_id}", 543 ) 544 if r.status_code == 404: 545 print(f"Job {job_id} does not exist") 546 return None 547 else: 548 return r.json() 549 550 def set_flow_user_state(self, key: str, value: Any) -> None: 551 """Set the user state of a flow at a given key""" 552 flow_id = self.get_root_job_id() 553 r = self.post( 554 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 555 json=value, 556 raise_for_status=False, 557 ) 558 if r.status_code == 404: 559 print(f"Job {flow_id} does not exist or is not a flow") 560 561 def get_flow_user_state(self, key: str) -> Any: 562 """Get the user state of a flow at a given key""" 563 flow_id = self.get_root_job_id() 564 r = self.get( 565 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 566 raise_for_status=False, 567 ) 568 if r.status_code == 404: 569 print(f"Job {flow_id} does not exist or is not a flow") 570 return None 571 else: 572 return r.json() 573 574 @property 575 def version(self): 576 return self.get("version").text 577 578 def get_duckdb_connection_settings( 579 self, 580 s3_resource_path: str = "", 581 ) -> DuckDbConnectionSettings | None: 582 """ 583 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 584 initiate an S3 connection from DuckDB 585 """ 586 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 587 try: 588 raw_obj = self.post( 589 f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings", 590 json={} 591 if s3_resource_path == "" 592 else {"s3_resource_path": s3_resource_path}, 593 ).json() 594 return DuckDbConnectionSettings(raw_obj) 595 except JSONDecodeError as e: 596 raise Exception( 597 "Could not generate DuckDB S3 connection settings from the provided resource" 598 ) from e 599 600 def get_polars_connection_settings( 601 self, 602 s3_resource_path: str = "", 603 ) -> PolarsConnectionSettings: 604 """ 605 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 606 initiate an S3 connection from Polars 607 """ 608 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 609 try: 610 raw_obj = self.post( 611 f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings", 612 json={} 613 if s3_resource_path == "" 614 else {"s3_resource_path": s3_resource_path}, 615 ).json() 616 return PolarsConnectionSettings(raw_obj) 617 except JSONDecodeError as e: 618 raise Exception( 619 "Could not generate Polars S3 connection settings from the provided resource" 620 ) from e 621 622 def get_boto3_connection_settings( 623 self, 624 s3_resource_path: str = "", 625 ) -> Boto3ConnectionSettings: 626 """ 627 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 628 initiate an S3 connection using boto3 629 """ 630 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 631 try: 632 s3_resource = self.post( 633 f"/w/{self.workspace}/job_helpers/v2/s3_resource_info", 634 json={} 635 if s3_resource_path == "" 636 else {"s3_resource_path": s3_resource_path}, 637 ).json() 638 return self.__boto3_connection_settings(s3_resource) 639 except JSONDecodeError as e: 640 raise Exception( 641 "Could not generate Boto3 S3 connection settings from the provided resource" 642 ) from e 643 644 def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes: 645 """ 646 Load a file from the workspace s3 bucket and returns its content as bytes. 647 648 '''python 649 from wmill import S3Object 650 651 s3_obj = S3Object(s3="/path/to/my_file.txt") 652 my_obj_content = client.load_s3_file(s3_obj) 653 file_content = my_obj_content.decode("utf-8") 654 ''' 655 """ 656 s3object = parse_s3_object(s3object) 657 with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 658 return file_reader.read() 659 660 def load_s3_file_reader( 661 self, s3object: S3Object | str, s3_resource_path: str | None 662 ) -> BufferedReader: 663 """ 664 Load a file from the workspace s3 bucket and returns the bytes stream. 665 666 '''python 667 from wmill import S3Object 668 669 s3_obj = S3Object(s3="/path/to/my_file.txt") 670 with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 671 print(file_reader.read()) 672 ''' 673 """ 674 s3object = parse_s3_object(s3object) 675 reader = S3BufferedReader( 676 f"{self.workspace}", 677 self.client, 678 s3object["s3"], 679 s3_resource_path, 680 s3object["storage"] if "storage" in s3object else None, 681 ) 682 return reader 683 684 def write_s3_file( 685 self, 686 s3object: S3Object | str | None, 687 file_content: BufferedReader | bytes, 688 s3_resource_path: str | None, 689 content_type: str | None = None, 690 content_disposition: str | None = None, 691 ) -> S3Object: 692 """ 693 Write a file to the workspace S3 bucket 694 695 '''python 696 from wmill import S3Object 697 698 s3_obj = S3Object(s3="/path/to/my_file.txt") 699 700 # for an in memory bytes array: 701 file_content = b'Hello Windmill!' 702 client.write_s3_file(s3_obj, file_content) 703 704 # for a file: 705 with open("my_file.txt", "rb") as my_file: 706 client.write_s3_file(s3_obj, my_file) 707 ''' 708 """ 709 s3object = parse_s3_object(s3object) 710 # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator 711 if isinstance(file_content, BufferedReader): 712 content_payload = bytes_generator(file_content) 713 elif isinstance(file_content, bytes): 714 content_payload = file_content 715 else: 716 raise Exception("Type of file_content not supported") 717 718 query_params = {} 719 if s3object is not None and s3object["s3"] != "": 720 query_params["file_key"] = s3object["s3"] 721 if s3_resource_path is not None and s3_resource_path != "": 722 query_params["s3_resource_path"] = s3_resource_path 723 if ( 724 s3object is not None 725 and "storage" in s3object 726 and s3object["storage"] is not None 727 ): 728 query_params["storage"] = s3object["storage"] 729 if content_type is not None: 730 query_params["content_type"] = content_type 731 if content_disposition is not None: 732 query_params["content_disposition"] = content_disposition 733 734 try: 735 # need a vanilla client b/c content-type is not application/json here 736 response = httpx.post( 737 f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file", 738 headers={ 739 "Authorization": f"Bearer {self.token}", 740 "Content-Type": "application/octet-stream", 741 }, 742 params=query_params, 743 content=content_payload, 744 verify=self.verify, 745 timeout=None, 746 ).json() 747 except Exception as e: 748 raise Exception("Could not write file to S3") from e 749 return S3Object(s3=response["file_key"]) 750 751 def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]: 752 return self.post( 753 f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))} 754 ).json() 755 756 def sign_s3_object(self, s3_object: S3Object | str) -> S3Object: 757 return self.post( 758 f"/w/{self.workspace}/apps/sign_s3_objects", 759 json={"s3_objects": [s3_object]}, 760 ).json()[0] 761 762 def __boto3_connection_settings(self, s3_resource) -> Boto3ConnectionSettings: 763 endpoint_url_prefix = "https://" if s3_resource["useSSL"] else "http://" 764 return Boto3ConnectionSettings( 765 { 766 "endpoint_url": "{}{}".format( 767 endpoint_url_prefix, s3_resource["endPoint"] 768 ), 769 "region_name": s3_resource["region"], 770 "use_ssl": s3_resource["useSSL"], 771 "aws_access_key_id": s3_resource["accessKey"], 772 "aws_secret_access_key": s3_resource["secretKey"], 773 # no need for path_style here as boto3 is clever enough to determine which one to use 774 } 775 ) 776 777 def whoami(self) -> dict: 778 return self.get("/users/whoami").json() 779 780 @property 781 def user(self) -> dict: 782 return self.whoami() 783 784 @property 785 def state_path(self) -> str: 786 state_path = os.environ.get( 787 "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH") 788 ) 789 if state_path is None: 790 raise Exception("State path not found") 791 return state_path 792 793 @property 794 def state(self) -> Any: 795 return self.get_resource(path=self.state_path, none_if_undefined=True) 796 797 @state.setter 798 def state(self, value: Any) -> None: 799 self.set_state(value) 800 801 @staticmethod 802 def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None: 803 """ 804 Set the state in the shared folder using pickle 805 """ 806 import pickle 807 808 with open(f"/shared/{path}", "wb") as handle: 809 pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL) 810 811 @staticmethod 812 def get_shared_state_pickle(path: str = "state.pickle") -> Any: 813 """ 814 Get the state in the shared folder using pickle 815 """ 816 import pickle 817 818 with open(f"/shared/{path}", "rb") as handle: 819 return pickle.load(handle) 820 821 @staticmethod 822 def set_shared_state(value: Any, path: str = "state.json") -> None: 823 """ 824 Set the state in the shared folder using pickle 825 """ 826 import json 827 828 with open(f"/shared/{path}", "w", encoding="utf-8") as f: 829 json.dump(value, f, ensure_ascii=False, indent=4) 830 831 @staticmethod 832 def get_shared_state(path: str = "state.json") -> None: 833 """ 834 Get the state in the shared folder using pickle 835 """ 836 import json 837 838 with open(f"/shared/{path}", "r", encoding="utf-8") as f: 839 return json.load(f) 840 841 def get_resume_urls(self, approver: str = None) -> dict: 842 nonce = random.randint(0, 1000000000) 843 job_id = os.environ.get("WM_JOB_ID") or "NO_ID" 844 return self.get( 845 f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}", 846 params={"approver": approver}, 847 ).json() 848 849 def request_interactive_slack_approval( 850 self, 851 slack_resource_path: str, 852 channel_id: str, 853 message: str = None, 854 approver: str = None, 855 default_args_json: dict = None, 856 dynamic_enums_json: dict = None, 857 ) -> None: 858 """ 859 Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields. 860 861 **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. 862 Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form 863 864 :param slack_resource_path: The path to the Slack resource in Windmill. 865 :type slack_resource_path: str 866 :param channel_id: The Slack channel ID where the approval request will be sent. 867 :type channel_id: str 868 :param message: Optional custom message to include in the Slack approval request. 869 :type message: str, optional 870 :param approver: Optional user ID or name of the approver for the request. 871 :type approver: str, optional 872 :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields. 873 :type default_args_json: dict, optional 874 :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields. 875 :type dynamic_enums_json: dict, optional 876 877 :raises Exception: If the function is not called within a flow or flow preview. 878 :raises Exception: If the required flow job or flow step environment variables are not set. 879 880 :return: None 881 882 **Usage Example:** 883 >>> client.request_interactive_slack_approval( 884 ... slack_resource_path="/u/alex/my_slack_resource", 885 ... channel_id="admins-slack-channel", 886 ... message="Please approve this request", 887 ... approver="approver123", 888 ... default_args_json={"key1": "value1", "key2": 42}, 889 ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, 890 ... ) 891 892 **Notes:** 893 - This function must be executed within a Windmill flow or flow preview. 894 - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context. 895 """ 896 workspace = self.workspace 897 flow_job_id = os.environ.get("WM_FLOW_JOB_ID") 898 899 if not flow_job_id: 900 raise Exception( 901 "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview." 902 ) 903 904 # Only include non-empty parameters 905 params = {} 906 if message: 907 params["message"] = message 908 if approver: 909 params["approver"] = approver 910 if slack_resource_path: 911 params["slack_resource_path"] = slack_resource_path 912 if channel_id: 913 params["channel_id"] = channel_id 914 if os.environ.get("WM_FLOW_STEP_ID"): 915 params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID") 916 if default_args_json: 917 params["default_args_json"] = json.dumps(default_args_json) 918 if dynamic_enums_json: 919 params["dynamic_enums_json"] = json.dumps(dynamic_enums_json) 920 921 self.get( 922 f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}", 923 params=params, 924 ) 925 926 def username_to_email(self, username: str) -> str: 927 """ 928 Get email from workspace username 929 This method is particularly useful for apps that require the email address of the viewer. 930 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 931 """ 932 return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text 933 934 def send_teams_message( 935 self, 936 conversation_id: str, 937 text: str, 938 success: bool = True, 939 card_block: dict = None, 940 ): 941 """ 942 Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message 943 """ 944 return self.post( 945 f"/teams/activities", 946 json={ 947 "conversation_id": conversation_id, 948 "text": text, 949 "success": success, 950 "card_block": card_block, 951 }, 952 )
36 def __init__(self, base_url=None, token=None, workspace=None, verify=True): 37 base = ( 38 base_url 39 or os.environ.get("BASE_INTERNAL_URL") 40 or os.environ.get("WM_BASE_URL") 41 ) 42 43 self.base_url = f"{base}/api" 44 self.token = token or os.environ.get("WM_TOKEN") 45 self.headers = { 46 "Content-Type": "application/json", 47 "Authorization": f"Bearer {self.token}", 48 } 49 self.verify = verify 50 self.client = self.get_client() 51 self.workspace = workspace or os.environ.get("WM_WORKSPACE") 52 self.path = os.environ.get("WM_JOB_PATH") 53 54 self.mocked_api = self.get_mocked_api() 55 56 assert self.workspace, ( 57 f"workspace required as an argument or as WM_WORKSPACE environment variable" 58 )
60 def get_mocked_api(self) -> Optional[dict]: 61 mocked_path = os.environ.get("WM_MOCKED_API_FILE") 62 if not mocked_path: 63 return None 64 logger.info("Using mocked API from %s", mocked_path) 65 mocked_api = {"variables": {}, "resources": {}} 66 try: 67 with open(mocked_path, "r") as f: 68 incoming_mocked_api = json.load(f) 69 mocked_api = {**mocked_api, **incoming_mocked_api} 70 except Exception as e: 71 logger.warning( 72 "Error parsing mocked API file at path %s Using empty mocked API.", 73 mocked_path, 74 ) 75 logger.debug(e) 76 return mocked_api
85 def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 86 endpoint = endpoint.lstrip("/") 87 resp = self.client.get(f"/{endpoint}", **kwargs) 88 if raise_for_status: 89 try: 90 resp.raise_for_status() 91 except httpx.HTTPStatusError as err: 92 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 93 logger.error(error) 94 raise Exception(error) 95 return resp
97 def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 98 endpoint = endpoint.lstrip("/") 99 resp = self.client.post(f"/{endpoint}", **kwargs) 100 if raise_for_status: 101 try: 102 resp.raise_for_status() 103 except httpx.HTTPStatusError as err: 104 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 105 logger.error(error) 106 raise Exception(error) 107 return resp
117 def run_script_async( 118 self, 119 path: str = None, 120 hash_: str = None, 121 args: dict = None, 122 scheduled_in_secs: int = None, 123 ) -> str: 124 """Create a script job and return its job id. 125 126 .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead. 127 """ 128 logging.warning( 129 "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.", 130 ) 131 assert not (path and hash_), "path and hash_ are mutually exclusive" 132 return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
Create a script job and return its job id.
Deprecated since version Use run_script_by_path_async or run_script_by_hash_async instead..
158 def run_script_by_path_async( 159 self, 160 path: str, 161 args: dict = None, 162 scheduled_in_secs: int = None, 163 ) -> str: 164 """Create a script job by path and return its job id.""" 165 return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs)
Create a script job by path and return its job id.
167 def run_script_by_hash_async( 168 self, 169 hash_: str, 170 args: dict = None, 171 scheduled_in_secs: int = None, 172 ) -> str: 173 """Create a script job by hash and return its job id.""" 174 return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
Create a script job by hash and return its job id.
176 def run_flow_async( 177 self, 178 path: str, 179 args: dict = None, 180 scheduled_in_secs: int = None, 181 # can only be set to false if this the job will be fully await and not concurrent with any other job 182 # as otherwise the child flow and its own child will store their state in the parent job which will 183 # lead to incorrectness and failures 184 do_not_track_in_parent: bool = True, 185 ) -> str: 186 """Create a flow job and return its job id.""" 187 args = args or {} 188 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 189 if not do_not_track_in_parent: 190 if os.environ.get("WM_JOB_ID"): 191 params["parent_job"] = os.environ.get("WM_JOB_ID") 192 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 193 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 194 if path: 195 endpoint = f"/w/{self.workspace}/jobs/run/f/{path}" 196 else: 197 raise Exception("path must be provided") 198 return self.post(endpoint, json=args, params=params).text
Create a flow job and return its job id.
200 def run_script( 201 self, 202 path: str = None, 203 hash_: str = None, 204 args: dict = None, 205 timeout: dt.timedelta | int | float | None = None, 206 verbose: bool = False, 207 cleanup: bool = True, 208 assert_result_is_not_none: bool = False, 209 ) -> Any: 210 """Run script synchronously and return its result. 211 212 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 213 """ 214 logging.warning( 215 "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.", 216 ) 217 assert not (path and hash_), "path and hash_ are mutually exclusive" 218 return self._run_script_internal( 219 path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose, 220 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 221 )
Run script synchronously and return its result.
Deprecated since version Use run_script_by_path or run_script_by_hash instead..
250 def run_script_by_path( 251 self, 252 path: str, 253 args: dict = None, 254 timeout: dt.timedelta | int | float | None = None, 255 verbose: bool = False, 256 cleanup: bool = True, 257 assert_result_is_not_none: bool = False, 258 ) -> Any: 259 """Run script by path synchronously and return its result.""" 260 return self._run_script_internal( 261 path=path, args=args, timeout=timeout, verbose=verbose, 262 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 263 )
Run script by path synchronously and return its result.
265 def run_script_by_hash( 266 self, 267 hash_: str, 268 args: dict = None, 269 timeout: dt.timedelta | int | float | None = None, 270 verbose: bool = False, 271 cleanup: bool = True, 272 assert_result_is_not_none: bool = False, 273 ) -> Any: 274 """Run script by hash synchronously and return its result.""" 275 return self._run_script_internal( 276 hash_=hash_, args=args, timeout=timeout, verbose=verbose, 277 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 278 )
Run script by hash synchronously and return its result.
280 def wait_job( 281 self, 282 job_id, 283 timeout: dt.timedelta | int | float | None = None, 284 verbose: bool = False, 285 cleanup: bool = True, 286 assert_result_is_not_none: bool = False, 287 ): 288 def cancel_job(): 289 logger.warning(f"cancelling job: {job_id}") 290 self.post( 291 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 292 json={"reason": "parent script cancelled"}, 293 ).raise_for_status() 294 295 if cleanup: 296 atexit.register(cancel_job) 297 298 start_time = time.time() 299 300 if isinstance(timeout, dt.timedelta): 301 timeout = timeout.total_seconds() 302 303 while True: 304 result_res = self.get( 305 f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True 306 ).json() 307 308 started = result_res["started"] 309 completed = result_res["completed"] 310 success = result_res["success"] 311 312 if not started and verbose: 313 logger.info(f"job {job_id} has not started yet") 314 315 if cleanup and completed: 316 atexit.unregister(cancel_job) 317 318 if completed: 319 result = result_res["result"] 320 if success: 321 if result is None and assert_result_is_not_none: 322 raise Exception("Result was none") 323 return result 324 else: 325 error = result["error"] 326 raise Exception(f"Job {job_id} was not successful: {str(error)}") 327 328 if timeout and ((time.time() - start_time) > timeout): 329 msg = "reached timeout" 330 logger.warning(msg) 331 self.post( 332 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 333 json={"reason": msg}, 334 ) 335 raise TimeoutError(msg) 336 if verbose: 337 logger.info(f"sleeping 0.5 seconds for {job_id = }") 338 339 time.sleep(0.5)
341 def cancel_running(self) -> dict: 342 """Cancel currently running executions of the same script.""" 343 logger.info("canceling running executions of this script") 344 345 jobs = self.get( 346 f"/w/{self.workspace}/jobs/list", 347 params={ 348 "running": "true", 349 "script_path_exact": self.path, 350 }, 351 ).json() 352 353 current_job_id = os.environ.get("WM_JOB_ID") 354 355 logger.debug(f"{current_job_id = }") 356 357 job_ids = [j["id"] for j in jobs if j["id"] != current_job_id] 358 359 if job_ids: 360 logger.info(f"cancelling the following job ids: {job_ids}") 361 else: 362 logger.info("no previous executions to cancel") 363 364 result = {} 365 366 for id_ in job_ids: 367 result[id_] = self.post( 368 f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}", 369 json={"reason": "killed by `cancel_running` method"}, 370 ) 371 372 return result
Cancel currently running executions of the same script.
384 def get_job_status(self, job_id: str) -> JobStatus: 385 job = self.get_job(job_id) 386 job_type = job.get("type", "") 387 assert job_type, f"{job} is not a valid job" 388 if job_type.lower() == "completedjob": 389 return "COMPLETED" 390 if job.get("running"): 391 return "RUNNING" 392 return "WAITING"
394 def get_result( 395 self, 396 job_id: str, 397 assert_result_is_not_none: bool = True, 398 ) -> Any: 399 result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}") 400 result_text = result.text 401 if assert_result_is_not_none and result_text is None: 402 raise Exception(f"result is None for {job_id = }") 403 try: 404 return result.json() 405 except JSONDecodeError: 406 return result_text
408 def get_variable(self, path: str) -> str: 409 path = parse_variable_syntax(path) or path 410 if self.mocked_api is not None: 411 variables = self.mocked_api["variables"] 412 try: 413 result = variables[path] 414 return result 415 except KeyError: 416 logger.info( 417 f"MockedAPI present, but variable not found at {path}, falling back to real API" 418 ) 419 420 """Get variable from Windmill""" 421 return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json()
423 def set_variable(self, path: str, value: str, is_secret: bool = False) -> None: 424 path = parse_variable_syntax(path) or path 425 if self.mocked_api is not None: 426 self.mocked_api["variables"][path] = value 427 return 428 429 """Set variable from Windmill""" 430 # check if variable exists 431 r = self.get( 432 f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False 433 ) 434 if r.status_code == 404: 435 # create variable 436 self.post( 437 f"/w/{self.workspace}/variables/create", 438 json={ 439 "path": path, 440 "value": value, 441 "is_secret": is_secret, 442 "description": "", 443 }, 444 ) 445 else: 446 # update variable 447 self.post( 448 f"/w/{self.workspace}/variables/update/{path}", 449 json={"value": value}, 450 )
452 def get_resource( 453 self, 454 path: str, 455 none_if_undefined: bool = False, 456 ) -> dict | None: 457 path = parse_resource_syntax(path) or path 458 if self.mocked_api is not None: 459 resources = self.mocked_api["resources"] 460 try: 461 result = resources[path] 462 return result 463 except KeyError: 464 # NOTE: should mocked_api respect `none_if_undefined`? 465 if none_if_undefined: 466 logger.info( 467 f"resource not found at ${path}, but none_if_undefined is True, so returning None" 468 ) 469 return None 470 logger.info( 471 f"MockedAPI present, but resource not found at ${path}, falling back to real API" 472 ) 473 474 """Get resource from Windmill""" 475 try: 476 return self.get( 477 f"/w/{self.workspace}/resources/get_value_interpolated/{path}" 478 ).json() 479 except Exception as e: 480 if none_if_undefined: 481 return None 482 logger.error(e) 483 raise e
485 def set_resource( 486 self, 487 value: Any, 488 path: str, 489 resource_type: str, 490 ): 491 path = parse_resource_syntax(path) or path 492 if self.mocked_api is not None: 493 self.mocked_api["resources"][path] = value 494 return 495 496 # check if resource exists 497 r = self.get( 498 f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False 499 ) 500 if r.status_code == 404: 501 # create resource 502 self.post( 503 f"/w/{self.workspace}/resources/create", 504 json={ 505 "path": path, 506 "value": value, 507 "resource_type": resource_type, 508 }, 509 ) 510 else: 511 # update resource 512 self.post( 513 f"/w/{self.workspace}/resources/update_value/{path}", 514 json={"value": value}, 515 )
520 def set_progress(self, value: int, job_id: Optional[str] = None): 521 workspace = get_workspace() 522 flow_id = os.environ.get("WM_FLOW_JOB_ID") 523 job_id = job_id or os.environ.get("WM_JOB_ID") 524 525 if job_id != None: 526 job = self.get_job(job_id) 527 flow_id = job.get("parent_job") 528 529 self.post( 530 f"/w/{workspace}/job_metrics/set_progress/{job_id}", 531 json={ 532 "percent": value, 533 "flow_job_id": flow_id or None, 534 }, 535 )
537 def get_progress(self, job_id: Optional[str] = None) -> Any: 538 workspace = get_workspace() 539 job_id = job_id or os.environ.get("WM_JOB_ID") 540 541 r = self.get( 542 f"/w/{workspace}/job_metrics/get_progress/{job_id}", 543 ) 544 if r.status_code == 404: 545 print(f"Job {job_id} does not exist") 546 return None 547 else: 548 return r.json()
550 def set_flow_user_state(self, key: str, value: Any) -> None: 551 """Set the user state of a flow at a given key""" 552 flow_id = self.get_root_job_id() 553 r = self.post( 554 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 555 json=value, 556 raise_for_status=False, 557 ) 558 if r.status_code == 404: 559 print(f"Job {flow_id} does not exist or is not a flow")
Set the user state of a flow at a given key
561 def get_flow_user_state(self, key: str) -> Any: 562 """Get the user state of a flow at a given key""" 563 flow_id = self.get_root_job_id() 564 r = self.get( 565 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 566 raise_for_status=False, 567 ) 568 if r.status_code == 404: 569 print(f"Job {flow_id} does not exist or is not a flow") 570 return None 571 else: 572 return r.json()
Get the user state of a flow at a given key
578 def get_duckdb_connection_settings( 579 self, 580 s3_resource_path: str = "", 581 ) -> DuckDbConnectionSettings | None: 582 """ 583 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 584 initiate an S3 connection from DuckDB 585 """ 586 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 587 try: 588 raw_obj = self.post( 589 f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings", 590 json={} 591 if s3_resource_path == "" 592 else {"s3_resource_path": s3_resource_path}, 593 ).json() 594 return DuckDbConnectionSettings(raw_obj) 595 except JSONDecodeError as e: 596 raise Exception( 597 "Could not generate DuckDB S3 connection settings from the provided resource" 598 ) from e
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from DuckDB
600 def get_polars_connection_settings( 601 self, 602 s3_resource_path: str = "", 603 ) -> PolarsConnectionSettings: 604 """ 605 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 606 initiate an S3 connection from Polars 607 """ 608 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 609 try: 610 raw_obj = self.post( 611 f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings", 612 json={} 613 if s3_resource_path == "" 614 else {"s3_resource_path": s3_resource_path}, 615 ).json() 616 return PolarsConnectionSettings(raw_obj) 617 except JSONDecodeError as e: 618 raise Exception( 619 "Could not generate Polars S3 connection settings from the provided resource" 620 ) from e
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from Polars
622 def get_boto3_connection_settings( 623 self, 624 s3_resource_path: str = "", 625 ) -> Boto3ConnectionSettings: 626 """ 627 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 628 initiate an S3 connection using boto3 629 """ 630 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 631 try: 632 s3_resource = self.post( 633 f"/w/{self.workspace}/job_helpers/v2/s3_resource_info", 634 json={} 635 if s3_resource_path == "" 636 else {"s3_resource_path": s3_resource_path}, 637 ).json() 638 return self.__boto3_connection_settings(s3_resource) 639 except JSONDecodeError as e: 640 raise Exception( 641 "Could not generate Boto3 S3 connection settings from the provided resource" 642 ) from e
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection using boto3
644 def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes: 645 """ 646 Load a file from the workspace s3 bucket and returns its content as bytes. 647 648 '''python 649 from wmill import S3Object 650 651 s3_obj = S3Object(s3="/path/to/my_file.txt") 652 my_obj_content = client.load_s3_file(s3_obj) 653 file_content = my_obj_content.decode("utf-8") 654 ''' 655 """ 656 s3object = parse_s3_object(s3object) 657 with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 658 return file_reader.read()
Load a file from the workspace s3 bucket and returns its content as bytes.
'''python from wmill import S3Object
s3_obj = S3Object(s3="/path/to/my_file.txt") my_obj_content = client.load_s3_file(s3_obj) file_content = my_obj_content.decode("utf-8") '''
660 def load_s3_file_reader( 661 self, s3object: S3Object | str, s3_resource_path: str | None 662 ) -> BufferedReader: 663 """ 664 Load a file from the workspace s3 bucket and returns the bytes stream. 665 666 '''python 667 from wmill import S3Object 668 669 s3_obj = S3Object(s3="/path/to/my_file.txt") 670 with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 671 print(file_reader.read()) 672 ''' 673 """ 674 s3object = parse_s3_object(s3object) 675 reader = S3BufferedReader( 676 f"{self.workspace}", 677 self.client, 678 s3object["s3"], 679 s3_resource_path, 680 s3object["storage"] if "storage" in s3object else None, 681 ) 682 return reader
Load a file from the workspace s3 bucket and returns the bytes stream.
'''python from wmill import S3Object
s3_obj = S3Object(s3="/path/to/my_file.txt") with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: print(file_reader.read()) '''
684 def write_s3_file( 685 self, 686 s3object: S3Object | str | None, 687 file_content: BufferedReader | bytes, 688 s3_resource_path: str | None, 689 content_type: str | None = None, 690 content_disposition: str | None = None, 691 ) -> S3Object: 692 """ 693 Write a file to the workspace S3 bucket 694 695 '''python 696 from wmill import S3Object 697 698 s3_obj = S3Object(s3="/path/to/my_file.txt") 699 700 # for an in memory bytes array: 701 file_content = b'Hello Windmill!' 702 client.write_s3_file(s3_obj, file_content) 703 704 # for a file: 705 with open("my_file.txt", "rb") as my_file: 706 client.write_s3_file(s3_obj, my_file) 707 ''' 708 """ 709 s3object = parse_s3_object(s3object) 710 # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator 711 if isinstance(file_content, BufferedReader): 712 content_payload = bytes_generator(file_content) 713 elif isinstance(file_content, bytes): 714 content_payload = file_content 715 else: 716 raise Exception("Type of file_content not supported") 717 718 query_params = {} 719 if s3object is not None and s3object["s3"] != "": 720 query_params["file_key"] = s3object["s3"] 721 if s3_resource_path is not None and s3_resource_path != "": 722 query_params["s3_resource_path"] = s3_resource_path 723 if ( 724 s3object is not None 725 and "storage" in s3object 726 and s3object["storage"] is not None 727 ): 728 query_params["storage"] = s3object["storage"] 729 if content_type is not None: 730 query_params["content_type"] = content_type 731 if content_disposition is not None: 732 query_params["content_disposition"] = content_disposition 733 734 try: 735 # need a vanilla client b/c content-type is not application/json here 736 response = httpx.post( 737 f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file", 738 headers={ 739 "Authorization": f"Bearer {self.token}", 740 "Content-Type": "application/octet-stream", 741 }, 742 params=query_params, 743 content=content_payload, 744 verify=self.verify, 745 timeout=None, 746 ).json() 747 except Exception as e: 748 raise Exception("Could not write file to S3") from e 749 return S3Object(s3=response["file_key"])
Write a file to the workspace S3 bucket
'''python from wmill import S3Object
s3_obj = S3Object(s3="/path/to/my_file.txt")
for an in memory bytes array:
file_content = b'Hello Windmill!' client.write_s3_file(s3_obj, file_content)
for a file:
with open("my_file.txt", "rb") as my_file: client.write_s3_file(s3_obj, my_file) '''
849 def request_interactive_slack_approval( 850 self, 851 slack_resource_path: str, 852 channel_id: str, 853 message: str = None, 854 approver: str = None, 855 default_args_json: dict = None, 856 dynamic_enums_json: dict = None, 857 ) -> None: 858 """ 859 Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields. 860 861 **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. 862 Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form 863 864 :param slack_resource_path: The path to the Slack resource in Windmill. 865 :type slack_resource_path: str 866 :param channel_id: The Slack channel ID where the approval request will be sent. 867 :type channel_id: str 868 :param message: Optional custom message to include in the Slack approval request. 869 :type message: str, optional 870 :param approver: Optional user ID or name of the approver for the request. 871 :type approver: str, optional 872 :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields. 873 :type default_args_json: dict, optional 874 :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields. 875 :type dynamic_enums_json: dict, optional 876 877 :raises Exception: If the function is not called within a flow or flow preview. 878 :raises Exception: If the required flow job or flow step environment variables are not set. 879 880 :return: None 881 882 **Usage Example:** 883 >>> client.request_interactive_slack_approval( 884 ... slack_resource_path="/u/alex/my_slack_resource", 885 ... channel_id="admins-slack-channel", 886 ... message="Please approve this request", 887 ... approver="approver123", 888 ... default_args_json={"key1": "value1", "key2": 42}, 889 ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, 890 ... ) 891 892 **Notes:** 893 - This function must be executed within a Windmill flow or flow preview. 894 - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context. 895 """ 896 workspace = self.workspace 897 flow_job_id = os.environ.get("WM_FLOW_JOB_ID") 898 899 if not flow_job_id: 900 raise Exception( 901 "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview." 902 ) 903 904 # Only include non-empty parameters 905 params = {} 906 if message: 907 params["message"] = message 908 if approver: 909 params["approver"] = approver 910 if slack_resource_path: 911 params["slack_resource_path"] = slack_resource_path 912 if channel_id: 913 params["channel_id"] = channel_id 914 if os.environ.get("WM_FLOW_STEP_ID"): 915 params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID") 916 if default_args_json: 917 params["default_args_json"] = json.dumps(default_args_json) 918 if dynamic_enums_json: 919 params["dynamic_enums_json"] = json.dumps(dynamic_enums_json) 920 921 self.get( 922 f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}", 923 params=params, 924 )
Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
[Enterprise Edition Only] To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
Parameters
- slack_resource_path: The path to the Slack resource in Windmill.
- channel_id: The Slack channel ID where the approval request will be sent.
- message: Optional custom message to include in the Slack approval request.
- approver: Optional user ID or name of the approver for the request.
- default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
- dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
Raises
- Exception: If the function is not called within a flow or flow preview.
- Exception: If the required flow job or flow step environment variables are not set.
Returns
None
Usage Example:
client.request_interactive_slack_approval( ... slack_resource_path="/u/alex/my_slack_resource", ... channel_id="admins-slack-channel", ... message="Please approve this request", ... approver="approver123", ... default_args_json={"key1": "value1", "key2": 42}, ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, ... )
Notes:
- This function must be executed within a Windmill flow or flow preview.
- The function checks for required environment variables (
WM_FLOW_JOB_ID
,WM_FLOW_STEP_ID
) to ensure it is run in the appropriate context.
926 def username_to_email(self, username: str) -> str: 927 """ 928 Get email from workspace username 929 This method is particularly useful for apps that require the email address of the viewer. 930 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 931 """ 932 return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text
Get email from workspace username This method is particularly useful for apps that require the email address of the viewer. Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
934 def send_teams_message( 935 self, 936 conversation_id: str, 937 text: str, 938 success: bool = True, 939 card_block: dict = None, 940 ): 941 """ 942 Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message 943 """ 944 return self.post( 945 f"/teams/activities", 946 json={ 947 "conversation_id": conversation_id, 948 "text": text, 949 "success": success, 950 "card_block": card_block, 951 }, 952 )
Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
966def deprecate(in_favor_of: str): 967 def decorator(f): 968 @functools.wraps(f) 969 def wrapper(*args, **kwargs): 970 warnings.warn( 971 ( 972 f"The '{f.__name__}' method is deprecated and may be removed in the future. " 973 f"Consider {in_favor_of}" 974 ), 975 DeprecationWarning, 976 ) 977 return f(*args, **kwargs) 978 979 return wrapper 980 981 return decorator
1000@init_global_client 1001def run_script_async( 1002 hash_or_path: str, 1003 args: Dict[str, Any] = None, 1004 scheduled_in_secs: int = None, 1005) -> str: 1006 is_path = "/" in hash_or_path 1007 hash_ = None if is_path else hash_or_path 1008 path = hash_or_path if is_path else None 1009 return _client.run_script_async( 1010 hash_=hash_, 1011 path=path, 1012 args=args, 1013 scheduled_in_secs=scheduled_in_secs, 1014 )
1017@init_global_client 1018def run_flow_async( 1019 path: str, 1020 args: Dict[str, Any] = None, 1021 scheduled_in_secs: int = None, 1022 # can only be set to false if this the job will be fully await and not concurrent with any other job 1023 # as otherwise the child flow and its own child will store their state in the parent job which will 1024 # lead to incorrectness and failures 1025 do_not_track_in_parent: bool = True, 1026) -> str: 1027 return _client.run_flow_async( 1028 path=path, 1029 args=args, 1030 scheduled_in_secs=scheduled_in_secs, 1031 do_not_track_in_parent=do_not_track_in_parent, 1032 )
1035@init_global_client 1036def run_script_sync( 1037 hash: str, 1038 args: Dict[str, Any] = None, 1039 verbose: bool = False, 1040 assert_result_is_not_none: bool = True, 1041 cleanup: bool = True, 1042 timeout: dt.timedelta = None, 1043) -> Any: 1044 return _client.run_script( 1045 hash_=hash, 1046 args=args, 1047 verbose=verbose, 1048 assert_result_is_not_none=assert_result_is_not_none, 1049 cleanup=cleanup, 1050 timeout=timeout, 1051 )
1067@init_global_client 1068def run_script_by_hash_async( 1069 hash_: str, 1070 args: Dict[str, Any] = None, 1071 scheduled_in_secs: Union[None, int] = None, 1072) -> str: 1073 return _client.run_script_by_hash_async( 1074 hash_=hash_, 1075 args=args, 1076 scheduled_in_secs=scheduled_in_secs, 1077 )
1080@init_global_client 1081def run_script_by_path_sync( 1082 path: str, 1083 args: Dict[str, Any] = None, 1084 verbose: bool = False, 1085 assert_result_is_not_none: bool = True, 1086 cleanup: bool = True, 1087 timeout: dt.timedelta = None, 1088) -> Any: 1089 return _client.run_script( 1090 path=path, 1091 args=args, 1092 verbose=verbose, 1093 assert_result_is_not_none=assert_result_is_not_none, 1094 cleanup=cleanup, 1095 timeout=timeout, 1096 )
1099@init_global_client 1100def get_id_token(audience: str) -> str: 1101 """ 1102 Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc. 1103 """ 1104 return _client.get_id_token(audience)
Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc.
1119@init_global_client 1120def duckdb_connection_settings(s3_resource_path: str = "") -> DuckDbConnectionSettings: 1121 """ 1122 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1123 initiate an S3 connection from DuckDB 1124 """ 1125 return _client.get_duckdb_connection_settings(s3_resource_path)
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from DuckDB
1128@init_global_client 1129def polars_connection_settings(s3_resource_path: str = "") -> PolarsConnectionSettings: 1130 """ 1131 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1132 initiate an S3 connection from Polars 1133 """ 1134 return _client.get_polars_connection_settings(s3_resource_path)
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from Polars
1137@init_global_client 1138def boto3_connection_settings(s3_resource_path: str = "") -> Boto3ConnectionSettings: 1139 """ 1140 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1141 initiate an S3 connection using boto3 1142 """ 1143 return _client.get_boto3_connection_settings(s3_resource_path)
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection using boto3
1146@init_global_client 1147def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None = None) -> bytes: 1148 """ 1149 Load the entire content of a file stored in S3 as bytes 1150 """ 1151 return _client.load_s3_file( 1152 s3object, s3_resource_path if s3_resource_path != "" else None 1153 )
Load the entire content of a file stored in S3 as bytes
1156@init_global_client 1157def load_s3_file_reader( 1158 s3object: S3Object | str, s3_resource_path: str | None = None 1159) -> BufferedReader: 1160 """ 1161 Load the content of a file stored in S3 1162 """ 1163 return _client.load_s3_file_reader( 1164 s3object, s3_resource_path if s3_resource_path != "" else None 1165 )
Load the content of a file stored in S3
1168@init_global_client 1169def write_s3_file( 1170 s3object: S3Object | str | None, 1171 file_content: BufferedReader | bytes, 1172 s3_resource_path: str | None = None, 1173 content_type: str | None = None, 1174 content_disposition: str | None = None, 1175) -> S3Object: 1176 """ 1177 Upload a file to S3 1178 1179 Content type will be automatically guessed from path extension if left empty 1180 1181 See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition 1182 and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type 1183 1184 """ 1185 return _client.write_s3_file( 1186 s3object, 1187 file_content, 1188 s3_resource_path if s3_resource_path != "" else None, 1189 content_type, 1190 content_disposition, 1191 )
Upload a file to S3
Content type will be automatically guessed from path extension if left empty
See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
1194@init_global_client 1195def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]: 1196 """ 1197 Sign S3 objects to be used by anonymous users in public apps 1198 Returns a list of signed s3 tokens 1199 """ 1200 return _client.sign_s3_objects(s3_objects)
Sign S3 objects to be used by anonymous users in public apps Returns a list of signed s3 tokens
1203@init_global_client 1204def sign_s3_object(s3_object: S3Object| str) -> S3Object: 1205 """ 1206 Sign S3 object to be used by anonymous users in public apps 1207 Returns a signed s3 object 1208 """ 1209 return _client.sign_s3_object(s3_object)
Sign S3 object to be used by anonymous users in public apps Returns a signed s3 object
1212@init_global_client 1213def whoami() -> dict: 1214 """ 1215 Returns the current user 1216 """ 1217 return _client.user
Returns the current user
1220@init_global_client 1221@deprecate("Windmill().state") 1222def get_state() -> Any: 1223 """ 1224 Get the state 1225 """ 1226 return _client.state
Get the state
1229@init_global_client 1230def get_resource( 1231 path: str, 1232 none_if_undefined: bool = False, 1233) -> dict | None: 1234 """Get resource from Windmill""" 1235 return _client.get_resource(path, none_if_undefined)
Get resource from Windmill
1238@init_global_client 1239def set_resource(path: str, value: Any, resource_type: str = "any") -> None: 1240 """ 1241 Set the resource at a given path as a string, creating it if it does not exist 1242 """ 1243 return _client.set_resource(value=value, path=path, resource_type=resource_type)
Set the resource at a given path as a string, creating it if it does not exist
1246@init_global_client 1247def set_state(value: Any) -> None: 1248 """ 1249 Set the state 1250 """ 1251 return _client.set_state(value)
Set the state
1254@init_global_client 1255def set_progress(value: int, job_id: Optional[str] = None) -> None: 1256 """ 1257 Set the progress 1258 """ 1259 return _client.set_progress(value, job_id)
Set the progress
1262@init_global_client 1263def get_progress(job_id: Optional[str] = None) -> Any: 1264 """ 1265 Get the progress 1266 """ 1267 return _client.get_progress(job_id)
Get the progress
1299@init_global_client 1300def get_variable(path: str) -> str: 1301 """ 1302 Returns the variable at a given path as a string 1303 """ 1304 return _client.get_variable(path)
Returns the variable at a given path as a string
1307@init_global_client 1308def set_variable(path: str, value: str, is_secret: bool = False) -> None: 1309 """ 1310 Set the variable at a given path as a string, creating it if it does not exist 1311 """ 1312 return _client.set_variable(path, value, is_secret)
Set the variable at a given path as a string, creating it if it does not exist
1315@init_global_client 1316def get_flow_user_state(key: str) -> Any: 1317 """ 1318 Get the user state of a flow at a given key 1319 """ 1320 return _client.get_flow_user_state(key)
Get the user state of a flow at a given key
1323@init_global_client 1324def set_flow_user_state(key: str, value: Any) -> None: 1325 """ 1326 Set the user state of a flow at a given key 1327 """ 1328 return _client.set_flow_user_state(key, value)
Set the user state of a flow at a given key
1341@init_global_client 1342def request_interactive_slack_approval( 1343 slack_resource_path: str, 1344 channel_id: str, 1345 message: str = None, 1346 approver: str = None, 1347 default_args_json: dict = None, 1348 dynamic_enums_json: dict = None, 1349) -> None: 1350 return _client.request_interactive_slack_approval( 1351 slack_resource_path=slack_resource_path, 1352 channel_id=channel_id, 1353 message=message, 1354 approver=approver, 1355 default_args_json=default_args_json, 1356 dynamic_enums_json=dynamic_enums_json, 1357 )
1367@init_global_client 1368def cancel_running() -> dict: 1369 """Cancel currently running executions of the same script.""" 1370 return _client.cancel_running()
Cancel currently running executions of the same script.
1373@init_global_client 1374def run_script( 1375 path: str = None, 1376 hash_: str = None, 1377 args: dict = None, 1378 timeout: dt.timedelta | int | float = None, 1379 verbose: bool = False, 1380 cleanup: bool = True, 1381 assert_result_is_not_none: bool = True, 1382) -> Any: 1383 """Run script synchronously and return its result. 1384 1385 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 1386 """ 1387 return _client.run_script( 1388 path=path, 1389 hash_=hash_, 1390 args=args, 1391 verbose=verbose, 1392 assert_result_is_not_none=assert_result_is_not_none, 1393 cleanup=cleanup, 1394 timeout=timeout, 1395 )
Run script synchronously and return its result.
Deprecated since version Use run_script_by_path or run_script_by_hash instead..
1398@init_global_client 1399def run_script_by_path( 1400 path: str, 1401 args: dict = None, 1402 timeout: dt.timedelta | int | float = None, 1403 verbose: bool = False, 1404 cleanup: bool = True, 1405 assert_result_is_not_none: bool = True, 1406) -> Any: 1407 """Run script by path synchronously and return its result.""" 1408 return _client.run_script_by_path( 1409 path=path, 1410 args=args, 1411 verbose=verbose, 1412 assert_result_is_not_none=assert_result_is_not_none, 1413 cleanup=cleanup, 1414 timeout=timeout, 1415 )
Run script by path synchronously and return its result.
1418@init_global_client 1419def run_script_by_hash( 1420 hash_: str, 1421 args: dict = None, 1422 timeout: dt.timedelta | int | float = None, 1423 verbose: bool = False, 1424 cleanup: bool = True, 1425 assert_result_is_not_none: bool = True, 1426) -> Any: 1427 """Run script by hash synchronously and return its result.""" 1428 return _client.run_script_by_hash( 1429 hash_=hash_, 1430 args=args, 1431 verbose=verbose, 1432 assert_result_is_not_none=assert_result_is_not_none, 1433 cleanup=cleanup, 1434 timeout=timeout, 1435 )
Run script by hash synchronously and return its result.
1438@init_global_client 1439def username_to_email(username: str) -> str: 1440 """ 1441 Get email from workspace username 1442 This method is particularly useful for apps that require the email address of the viewer. 1443 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 1444 """ 1445 return _client.username_to_email(username)
Get email from workspace username This method is particularly useful for apps that require the email address of the viewer. Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1448def task(*args, **kwargs): 1449 from inspect import signature 1450 1451 def f(func, tag: str | None = None): 1452 if ( 1453 os.environ.get("WM_JOB_ID") is None 1454 or os.environ.get("MAIN_OVERRIDE") == func.__name__ 1455 ): 1456 1457 def inner(*args, **kwargs): 1458 return func(*args, **kwargs) 1459 1460 return inner 1461 else: 1462 1463 def inner(*args, **kwargs): 1464 global _client 1465 if _client is None: 1466 _client = Windmill() 1467 w_id = os.environ.get("WM_WORKSPACE") 1468 job_id = os.environ.get("WM_JOB_ID") 1469 f_name = func.__name__ 1470 json = kwargs 1471 params = list(signature(func).parameters) 1472 for i, arg in enumerate(args): 1473 if i < len(params): 1474 p = params[i] 1475 key = p 1476 if key not in kwargs: 1477 json[key] = arg 1478 1479 params = {} 1480 if tag is not None: 1481 params["tag"] = tag 1482 w_as_code_response = _client.post( 1483 f"/w/{w_id}/jobs/run/workflow_as_code/{job_id}/{f_name}", 1484 json={"args": json}, 1485 params=params, 1486 ) 1487 job_id = w_as_code_response.text 1488 print(f"Executing task {func.__name__} on job {job_id}") 1489 job_result = _client.wait_job(job_id) 1490 print(f"Task {func.__name__} ({job_id}) completed") 1491 return job_result 1492 1493 return inner 1494 1495 if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): 1496 return f(args[0], None) 1497 else: 1498 return lambda x: f(x, kwargs.get("tag"))
1500def parse_resource_syntax(s: str) -> Optional[str]: 1501 """Parse resource syntax from string.""" 1502 if s is None: 1503 return None 1504 if s.startswith("$res:"): 1505 return s[5:] 1506 if s.startswith("res://"): 1507 return s[6:] 1508 return None
Parse resource syntax from string.
1510def parse_s3_object(s3_object: S3Object | str) -> S3Object: 1511 """Parse S3 object from string or S3Object format.""" 1512 if isinstance(s3_object, str): 1513 match = re.match(r'^s3://([^/]*)/(.*)$', s3_object) 1514 if match: 1515 return S3Object(s3=match.group(2) or "", storage=match.group(1) or None) 1516 return S3Object(s3="") 1517 else: 1518 return s3_object
Parse S3 object from string or S3Object format.
1522def parse_variable_syntax(s: str) -> Optional[str]: 1523 """Parse variable syntax from string.""" 1524 if s.startswith("var://"): 1525 return s[6:] 1526 return None
Parse variable syntax from string.
1529def append_to_result_stream(text: str) -> None: 1530 """Append a text to the result stream. 1531 1532 Args: 1533 text: text to append to the result stream 1534 """ 1535 print("WM_STREAM: {}".format(text.replace(chr(10), '\\n')))
Append a text to the result stream.
Args: text: text to append to the result stream
1537def stream_result(stream) -> None: 1538 """Stream to the result stream. 1539 1540 Args: 1541 stream: stream to stream to the result stream 1542 """ 1543 for text in stream: 1544 append_to_result_stream(text)
Stream to the result stream.
Args: stream: stream to stream to the result stream