wmill.client
1from __future__ import annotations 2 3import atexit 4import datetime as dt 5import functools 6from io import BufferedReader, BytesIO 7import logging 8import os 9import random 10import time 11import warnings 12import json 13from json import JSONDecodeError 14from typing import Dict, Any, Union, Literal, Optional 15import re 16 17import httpx 18 19from .s3_reader import S3BufferedReader, bytes_generator 20from .s3_types import ( 21 Boto3ConnectionSettings, 22 DuckDbConnectionSettings, 23 PolarsConnectionSettings, 24 S3Object, 25) 26 27_client: "Windmill | None" = None 28 29logger = logging.getLogger("windmill_client") 30 31JobStatus = Literal["RUNNING", "WAITING", "COMPLETED"] 32 33 34class Windmill: 35 def __init__(self, base_url=None, token=None, workspace=None, verify=True): 36 base = ( 37 base_url 38 or os.environ.get("BASE_INTERNAL_URL") 39 or os.environ.get("WM_BASE_URL") 40 ) 41 42 self.base_url = f"{base}/api" 43 self.token = token or os.environ.get("WM_TOKEN") 44 self.headers = { 45 "Content-Type": "application/json", 46 "Authorization": f"Bearer {self.token}", 47 } 48 self.verify = verify 49 self.client = self.get_client() 50 self.workspace = workspace or os.environ.get("WM_WORKSPACE") 51 self.path = os.environ.get("WM_JOB_PATH") 52 53 self.mocked_api = self.get_mocked_api() 54 55 assert self.workspace, ( 56 f"workspace required as an argument or as WM_WORKSPACE environment variable" 57 ) 58 59 def get_mocked_api(self) -> Optional[dict]: 60 mocked_path = os.environ.get("WM_MOCKED_API_FILE") 61 if not mocked_path: 62 return None 63 logger.info("Using mocked API from %s", mocked_path) 64 mocked_api = {"variables": {}, "resources": {}} 65 try: 66 with open(mocked_path, "r") as f: 67 incoming_mocked_api = json.load(f) 68 mocked_api = {**mocked_api, **incoming_mocked_api} 69 except Exception as e: 70 logger.warning( 71 "Error parsing mocked API file at path %s Using empty mocked API.", 72 mocked_path, 73 ) 74 logger.debug(e) 75 return mocked_api 76 77 def get_client(self) -> httpx.Client: 78 return httpx.Client( 79 base_url=self.base_url, 80 headers=self.headers, 81 verify=self.verify, 82 ) 83 84 def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 85 endpoint = endpoint.lstrip("/") 86 resp = self.client.get(f"/{endpoint}", **kwargs) 87 if raise_for_status: 88 try: 89 resp.raise_for_status() 90 except httpx.HTTPStatusError as err: 91 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 92 logger.error(error) 93 raise Exception(error) 94 return resp 95 96 def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 97 endpoint = endpoint.lstrip("/") 98 resp = self.client.post(f"/{endpoint}", **kwargs) 99 if raise_for_status: 100 try: 101 resp.raise_for_status() 102 except httpx.HTTPStatusError as err: 103 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 104 logger.error(error) 105 raise Exception(error) 106 return resp 107 108 def create_token(self, duration=dt.timedelta(days=1)) -> str: 109 endpoint = "/users/tokens/create" 110 payload = { 111 "label": f"refresh {time.time()}", 112 "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"), 113 } 114 return self.post(endpoint, json=payload).text 115 116 def run_script_async( 117 self, 118 path: str = None, 119 hash_: str = None, 120 args: dict = None, 121 scheduled_in_secs: int = None, 122 ) -> str: 123 """Create a script job and return its job id. 124 125 .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead. 126 """ 127 logging.warning( 128 "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.", 129 ) 130 assert not (path and hash_), "path and hash_ are mutually exclusive" 131 return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs) 132 133 def _run_script_async_internal( 134 self, 135 path: str = None, 136 hash_: str = None, 137 args: dict = None, 138 scheduled_in_secs: int = None, 139 ) -> str: 140 """Internal helper for running scripts asynchronously.""" 141 args = args or {} 142 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 143 if os.environ.get("WM_JOB_ID"): 144 params["parent_job"] = os.environ.get("WM_JOB_ID") 145 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 146 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 147 148 if path: 149 endpoint = f"/w/{self.workspace}/jobs/run/p/{path}" 150 elif hash_: 151 endpoint = f"/w/{self.workspace}/jobs/run/h/{hash_}" 152 else: 153 raise Exception("path or hash_ must be provided") 154 155 return self.post(endpoint, json=args, params=params).text 156 157 def run_script_by_path_async( 158 self, 159 path: str, 160 args: dict = None, 161 scheduled_in_secs: int = None, 162 ) -> str: 163 """Create a script job by path and return its job id.""" 164 return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs) 165 166 def run_script_by_hash_async( 167 self, 168 hash_: str, 169 args: dict = None, 170 scheduled_in_secs: int = None, 171 ) -> str: 172 """Create a script job by hash and return its job id.""" 173 return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs) 174 175 def run_flow_async( 176 self, 177 path: str, 178 args: dict = None, 179 scheduled_in_secs: int = None, 180 # can only be set to false if this the job will be fully await and not concurrent with any other job 181 # as otherwise the child flow and its own child will store their state in the parent job which will 182 # lead to incorrectness and failures 183 do_not_track_in_parent: bool = True, 184 ) -> str: 185 """Create a flow job and return its job id.""" 186 args = args or {} 187 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 188 if not do_not_track_in_parent: 189 if os.environ.get("WM_JOB_ID"): 190 params["parent_job"] = os.environ.get("WM_JOB_ID") 191 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 192 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 193 if path: 194 endpoint = f"/w/{self.workspace}/jobs/run/f/{path}" 195 else: 196 raise Exception("path must be provided") 197 return self.post(endpoint, json=args, params=params).text 198 199 def run_script( 200 self, 201 path: str = None, 202 hash_: str = None, 203 args: dict = None, 204 timeout: dt.timedelta | int | float | None = None, 205 verbose: bool = False, 206 cleanup: bool = True, 207 assert_result_is_not_none: bool = False, 208 ) -> Any: 209 """Run script synchronously and return its result. 210 211 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 212 """ 213 logging.warning( 214 "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.", 215 ) 216 assert not (path and hash_), "path and hash_ are mutually exclusive" 217 return self._run_script_internal( 218 path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose, 219 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 220 ) 221 222 def _run_script_internal( 223 self, 224 path: str = None, 225 hash_: str = None, 226 args: dict = None, 227 timeout: dt.timedelta | int | float | None = None, 228 verbose: bool = False, 229 cleanup: bool = True, 230 assert_result_is_not_none: bool = False, 231 ) -> Any: 232 """Internal helper for running scripts synchronously.""" 233 args = args or {} 234 235 if verbose: 236 if path: 237 logger.info(f"running `{path}` synchronously with {args = }") 238 elif hash_: 239 logger.info(f"running script with hash `{hash_}` synchronously with {args = }") 240 241 if isinstance(timeout, dt.timedelta): 242 timeout = timeout.total_seconds() 243 244 job_id = self._run_script_async_internal(path=path, hash_=hash_, args=args) 245 return self.wait_job( 246 job_id, timeout, verbose, cleanup, assert_result_is_not_none 247 ) 248 249 def run_script_by_path( 250 self, 251 path: str, 252 args: dict = None, 253 timeout: dt.timedelta | int | float | None = None, 254 verbose: bool = False, 255 cleanup: bool = True, 256 assert_result_is_not_none: bool = False, 257 ) -> Any: 258 """Run script by path synchronously and return its result.""" 259 return self._run_script_internal( 260 path=path, args=args, timeout=timeout, verbose=verbose, 261 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 262 ) 263 264 def run_script_by_hash( 265 self, 266 hash_: str, 267 args: dict = None, 268 timeout: dt.timedelta | int | float | None = None, 269 verbose: bool = False, 270 cleanup: bool = True, 271 assert_result_is_not_none: bool = False, 272 ) -> Any: 273 """Run script by hash synchronously and return its result.""" 274 return self._run_script_internal( 275 hash_=hash_, args=args, timeout=timeout, verbose=verbose, 276 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 277 ) 278 279 def run_inline_script_preview( 280 self, 281 content: str, 282 language: str, 283 args: dict = None, 284 ) -> Any: 285 """Run a script on the current worker without creating a job""" 286 endpoint = f"/w/{self.workspace}/jobs/run_inline/preview" 287 body = { 288 "content": content, 289 "language": language, 290 "args": args or {}, 291 } 292 return self.post(endpoint, json=body).text 293 294 def wait_job( 295 self, 296 job_id, 297 timeout: dt.timedelta | int | float | None = None, 298 verbose: bool = False, 299 cleanup: bool = True, 300 assert_result_is_not_none: bool = False, 301 ): 302 def cancel_job(): 303 logger.warning(f"cancelling job: {job_id}") 304 self.post( 305 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 306 json={"reason": "parent script cancelled"}, 307 ).raise_for_status() 308 309 if cleanup: 310 atexit.register(cancel_job) 311 312 start_time = time.time() 313 314 if isinstance(timeout, dt.timedelta): 315 timeout = timeout.total_seconds() 316 317 while True: 318 result_res = self.get( 319 f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True 320 ).json() 321 322 started = result_res["started"] 323 completed = result_res["completed"] 324 success = result_res["success"] 325 326 if not started and verbose: 327 logger.info(f"job {job_id} has not started yet") 328 329 if cleanup and completed: 330 atexit.unregister(cancel_job) 331 332 if completed: 333 result = result_res["result"] 334 if success: 335 if result is None and assert_result_is_not_none: 336 raise Exception("Result was none") 337 return result 338 else: 339 error = result["error"] 340 raise Exception(f"Job {job_id} was not successful: {str(error)}") 341 342 if timeout and ((time.time() - start_time) > timeout): 343 msg = "reached timeout" 344 logger.warning(msg) 345 self.post( 346 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 347 json={"reason": msg}, 348 ) 349 raise TimeoutError(msg) 350 if verbose: 351 logger.info(f"sleeping 0.5 seconds for {job_id = }") 352 353 time.sleep(0.5) 354 355 def cancel_job(self, job_id: str, reason: str = None) -> str: 356 """Cancel a specific job by ID. 357 358 Args: 359 job_id: UUID of the job to cancel 360 reason: Optional reason for cancellation 361 362 Returns: 363 Response message from the cancel endpoint 364 """ 365 logger.info(f"cancelling job: {job_id}") 366 367 payload = {"reason": reason or "cancelled via cancel_job method"} 368 369 response = self.post( 370 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 371 json=payload, 372 ) 373 374 return response.text 375 376 def cancel_running(self) -> dict: 377 """Cancel currently running executions of the same script.""" 378 logger.info("canceling running executions of this script") 379 380 jobs = self.get( 381 f"/w/{self.workspace}/jobs/list", 382 params={ 383 "running": "true", 384 "script_path_exact": self.path, 385 }, 386 ).json() 387 388 current_job_id = os.environ.get("WM_JOB_ID") 389 390 logger.debug(f"{current_job_id = }") 391 392 job_ids = [j["id"] for j in jobs if j["id"] != current_job_id] 393 394 if job_ids: 395 logger.info(f"cancelling the following job ids: {job_ids}") 396 else: 397 logger.info("no previous executions to cancel") 398 399 result = {} 400 401 for id_ in job_ids: 402 result[id_] = self.post( 403 f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}", 404 json={"reason": "killed by `cancel_running` method"}, 405 ) 406 407 return result 408 409 def get_job(self, job_id: str) -> dict: 410 return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json() 411 412 def get_root_job_id(self, job_id: str | None = None) -> dict: 413 job_id = job_id or os.environ.get("WM_JOB_ID") 414 return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json() 415 416 def get_id_token(self, audience: str, expires_in: int | None = None) -> str: 417 params = {} 418 if expires_in is not None: 419 params["expires_in"] = expires_in 420 return self.post(f"/w/{self.workspace}/oidc/token/{audience}", params=params).text 421 422 def get_job_status(self, job_id: str) -> JobStatus: 423 job = self.get_job(job_id) 424 job_type = job.get("type", "") 425 assert job_type, f"{job} is not a valid job" 426 if job_type.lower() == "completedjob": 427 return "COMPLETED" 428 if job.get("running"): 429 return "RUNNING" 430 return "WAITING" 431 432 def get_result( 433 self, 434 job_id: str, 435 assert_result_is_not_none: bool = True, 436 ) -> Any: 437 result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}") 438 result_text = result.text 439 if assert_result_is_not_none and result_text is None: 440 raise Exception(f"result is None for {job_id = }") 441 try: 442 return result.json() 443 except JSONDecodeError: 444 return result_text 445 446 def get_variable(self, path: str) -> str: 447 path = parse_variable_syntax(path) or path 448 if self.mocked_api is not None: 449 variables = self.mocked_api["variables"] 450 try: 451 result = variables[path] 452 return result 453 except KeyError: 454 logger.info( 455 f"MockedAPI present, but variable not found at {path}, falling back to real API" 456 ) 457 458 """Get variable from Windmill""" 459 return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json() 460 461 def set_variable(self, path: str, value: str, is_secret: bool = False) -> None: 462 path = parse_variable_syntax(path) or path 463 if self.mocked_api is not None: 464 self.mocked_api["variables"][path] = value 465 return 466 467 """Set variable from Windmill""" 468 # check if variable exists 469 r = self.get( 470 f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False 471 ) 472 if r.status_code == 404: 473 # create variable 474 self.post( 475 f"/w/{self.workspace}/variables/create", 476 json={ 477 "path": path, 478 "value": value, 479 "is_secret": is_secret, 480 "description": "", 481 }, 482 ) 483 else: 484 # update variable 485 self.post( 486 f"/w/{self.workspace}/variables/update/{path}", 487 json={"value": value}, 488 ) 489 490 def get_resource( 491 self, 492 path: str, 493 none_if_undefined: bool = False, 494 ) -> dict | None: 495 path = parse_resource_syntax(path) or path 496 if self.mocked_api is not None: 497 resources = self.mocked_api["resources"] 498 try: 499 result = resources[path] 500 return result 501 except KeyError: 502 # NOTE: should mocked_api respect `none_if_undefined`? 503 if none_if_undefined: 504 logger.info( 505 f"resource not found at ${path}, but none_if_undefined is True, so returning None" 506 ) 507 return None 508 logger.info( 509 f"MockedAPI present, but resource not found at ${path}, falling back to real API" 510 ) 511 512 """Get resource from Windmill""" 513 try: 514 return self.get( 515 f"/w/{self.workspace}/resources/get_value_interpolated/{path}" 516 ).json() 517 except Exception as e: 518 if none_if_undefined: 519 return None 520 logger.error(e) 521 raise e 522 523 def set_resource( 524 self, 525 value: Any, 526 path: str, 527 resource_type: str, 528 ): 529 path = parse_resource_syntax(path) or path 530 if self.mocked_api is not None: 531 self.mocked_api["resources"][path] = value 532 return 533 534 # check if resource exists 535 r = self.get( 536 f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False 537 ) 538 if r.status_code == 404: 539 # create resource 540 self.post( 541 f"/w/{self.workspace}/resources/create", 542 json={ 543 "path": path, 544 "value": value, 545 "resource_type": resource_type, 546 }, 547 ) 548 else: 549 # update resource 550 self.post( 551 f"/w/{self.workspace}/resources/update_value/{path}", 552 json={"value": value}, 553 ) 554 555 def list_resources( 556 self, 557 resource_type: str = None, 558 page: int = None, 559 per_page: int = None, 560 ) -> list[dict]: 561 """List resources from Windmill workspace. 562 563 Args: 564 resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") 565 page: Optional page number for pagination 566 per_page: Optional number of results per page 567 568 Returns: 569 List of resource dictionaries 570 """ 571 params = {} 572 if resource_type is not None: 573 params["resource_type"] = resource_type 574 if page is not None: 575 params["page"] = page 576 if per_page is not None: 577 params["per_page"] = per_page 578 579 return self.get( 580 f"/w/{self.workspace}/resources/list", 581 params=params if params else None, 582 ).json() 583 584 def set_state(self, value: Any): 585 self.set_resource(value, path=self.state_path, resource_type="state") 586 587 def set_progress(self, value: int, job_id: Optional[str] = None): 588 workspace = get_workspace() 589 flow_id = os.environ.get("WM_FLOW_JOB_ID") 590 job_id = job_id or os.environ.get("WM_JOB_ID") 591 592 if job_id != None: 593 job = self.get_job(job_id) 594 flow_id = job.get("parent_job") 595 596 self.post( 597 f"/w/{workspace}/job_metrics/set_progress/{job_id}", 598 json={ 599 "percent": value, 600 "flow_job_id": flow_id or None, 601 }, 602 ) 603 604 def get_progress(self, job_id: Optional[str] = None) -> Any: 605 workspace = get_workspace() 606 job_id = job_id or os.environ.get("WM_JOB_ID") 607 608 r = self.get( 609 f"/w/{workspace}/job_metrics/get_progress/{job_id}", 610 ) 611 if r.status_code == 404: 612 print(f"Job {job_id} does not exist") 613 return None 614 else: 615 return r.json() 616 617 def set_flow_user_state(self, key: str, value: Any) -> None: 618 """Set the user state of a flow at a given key""" 619 flow_id = self.get_root_job_id() 620 r = self.post( 621 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 622 json=value, 623 raise_for_status=False, 624 ) 625 if r.status_code == 404: 626 print(f"Job {flow_id} does not exist or is not a flow") 627 628 def get_flow_user_state(self, key: str) -> Any: 629 """Get the user state of a flow at a given key""" 630 flow_id = self.get_root_job_id() 631 r = self.get( 632 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 633 raise_for_status=False, 634 ) 635 if r.status_code == 404: 636 print(f"Job {flow_id} does not exist or is not a flow") 637 return None 638 else: 639 return r.json() 640 641 @property 642 def version(self): 643 return self.get("version").text 644 645 def get_duckdb_connection_settings( 646 self, 647 s3_resource_path: str = "", 648 ) -> DuckDbConnectionSettings | None: 649 """ 650 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 651 initiate an S3 connection from DuckDB 652 """ 653 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 654 try: 655 raw_obj = self.post( 656 f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings", 657 json={} 658 if s3_resource_path == "" 659 else {"s3_resource_path": s3_resource_path}, 660 ).json() 661 return DuckDbConnectionSettings(raw_obj) 662 except JSONDecodeError as e: 663 raise Exception( 664 "Could not generate DuckDB S3 connection settings from the provided resource" 665 ) from e 666 667 def get_polars_connection_settings( 668 self, 669 s3_resource_path: str = "", 670 ) -> PolarsConnectionSettings: 671 """ 672 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 673 initiate an S3 connection from Polars 674 """ 675 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 676 try: 677 raw_obj = self.post( 678 f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings", 679 json={} 680 if s3_resource_path == "" 681 else {"s3_resource_path": s3_resource_path}, 682 ).json() 683 return PolarsConnectionSettings(raw_obj) 684 except JSONDecodeError as e: 685 raise Exception( 686 "Could not generate Polars S3 connection settings from the provided resource" 687 ) from e 688 689 def get_boto3_connection_settings( 690 self, 691 s3_resource_path: str = "", 692 ) -> Boto3ConnectionSettings: 693 """ 694 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 695 initiate an S3 connection using boto3 696 """ 697 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 698 try: 699 s3_resource = self.post( 700 f"/w/{self.workspace}/job_helpers/v2/s3_resource_info", 701 json={} 702 if s3_resource_path == "" 703 else {"s3_resource_path": s3_resource_path}, 704 ).json() 705 return self.__boto3_connection_settings(s3_resource) 706 except JSONDecodeError as e: 707 raise Exception( 708 "Could not generate Boto3 S3 connection settings from the provided resource" 709 ) from e 710 711 def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes: 712 """ 713 Load a file from the workspace s3 bucket and returns its content as bytes. 714 715 '''python 716 from wmill import S3Object 717 718 s3_obj = S3Object(s3="/path/to/my_file.txt") 719 my_obj_content = client.load_s3_file(s3_obj) 720 file_content = my_obj_content.decode("utf-8") 721 ''' 722 """ 723 s3object = parse_s3_object(s3object) 724 with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 725 return file_reader.read() 726 727 def load_s3_file_reader( 728 self, s3object: S3Object | str, s3_resource_path: str | None 729 ) -> BufferedReader: 730 """ 731 Load a file from the workspace s3 bucket and returns the bytes stream. 732 733 '''python 734 from wmill import S3Object 735 736 s3_obj = S3Object(s3="/path/to/my_file.txt") 737 with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 738 print(file_reader.read()) 739 ''' 740 """ 741 s3object = parse_s3_object(s3object) 742 reader = S3BufferedReader( 743 f"{self.workspace}", 744 self.client, 745 s3object["s3"], 746 s3_resource_path, 747 s3object["storage"] if "storage" in s3object else None, 748 ) 749 return reader 750 751 def write_s3_file( 752 self, 753 s3object: S3Object | str | None, 754 file_content: BufferedReader | bytes, 755 s3_resource_path: str | None, 756 content_type: str | None = None, 757 content_disposition: str | None = None, 758 ) -> S3Object: 759 """ 760 Write a file to the workspace S3 bucket 761 762 '''python 763 from wmill import S3Object 764 765 s3_obj = S3Object(s3="/path/to/my_file.txt") 766 767 # for an in memory bytes array: 768 file_content = b'Hello Windmill!' 769 client.write_s3_file(s3_obj, file_content) 770 771 # for a file: 772 with open("my_file.txt", "rb") as my_file: 773 client.write_s3_file(s3_obj, my_file) 774 ''' 775 """ 776 s3object = parse_s3_object(s3object) 777 # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator 778 if isinstance(file_content, BufferedReader): 779 content_payload = bytes_generator(file_content) 780 elif isinstance(file_content, bytes): 781 content_payload = file_content 782 else: 783 raise Exception("Type of file_content not supported") 784 785 query_params = {} 786 if s3object is not None and s3object["s3"] != "": 787 query_params["file_key"] = s3object["s3"] 788 if s3_resource_path is not None and s3_resource_path != "": 789 query_params["s3_resource_path"] = s3_resource_path 790 if ( 791 s3object is not None 792 and "storage" in s3object 793 and s3object["storage"] is not None 794 ): 795 query_params["storage"] = s3object["storage"] 796 if content_type is not None: 797 query_params["content_type"] = content_type 798 if content_disposition is not None: 799 query_params["content_disposition"] = content_disposition 800 801 try: 802 # need a vanilla client b/c content-type is not application/json here 803 response = httpx.post( 804 f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file", 805 headers={ 806 "Authorization": f"Bearer {self.token}", 807 "Content-Type": "application/octet-stream", 808 }, 809 params=query_params, 810 content=content_payload, 811 verify=self.verify, 812 timeout=None, 813 ).json() 814 except Exception as e: 815 raise Exception("Could not write file to S3") from e 816 return S3Object(s3=response["file_key"]) 817 818 def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]: 819 return self.post( 820 f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))} 821 ).json() 822 823 def sign_s3_object(self, s3_object: S3Object | str) -> S3Object: 824 return self.post( 825 f"/w/{self.workspace}/apps/sign_s3_objects", 826 json={"s3_objects": [s3_object]}, 827 ).json()[0] 828 829 def get_presigned_s3_public_urls( 830 self, 831 s3_objects: list[S3Object | str], 832 base_url: str | None = None, 833 ) -> list[str]: 834 """ 835 Generate presigned public URLs for an array of S3 objects. 836 If an S3 object is not signed yet, it will be signed first. 837 838 Args: 839 s3_objects: List of S3 objects to sign 840 base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL) 841 842 Returns: 843 List of signed public URLs 844 845 Example: 846 >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] 847 >>> urls = client.get_presigned_s3_public_urls(s3_objs) 848 """ 849 base_url = base_url or self._get_public_base_url() 850 851 s3_objs = [parse_s3_object(s3_obj) for s3_obj in s3_objects] 852 853 # Sign all S3 objects that need to be signed in one go 854 s3_objs_to_sign: list[tuple[S3Object, int]] = [ 855 (s3_obj, index) 856 for index, s3_obj in enumerate(s3_objs) 857 if s3_obj.get("presigned") is None 858 ] 859 860 if s3_objs_to_sign: 861 print(f"Signing {len(s3_objs_to_sign)} S3 objects...") 862 signed_s3_objs = self.sign_s3_objects( 863 [s3_obj for s3_obj, _ in s3_objs_to_sign] 864 ) 865 for i, (_, original_index) in enumerate(s3_objs_to_sign): 866 s3_objs[original_index] = parse_s3_object(signed_s3_objs[i]) 867 868 signed_urls: list[str] = [] 869 for s3_obj in s3_objs: 870 s3 = s3_obj.get("s3", "") 871 presigned = s3_obj.get("presigned", "") 872 storage = s3_obj.get("storage", "_default_") 873 signed_url = f"{base_url}/api/w/{self.workspace}/s3_proxy/{storage}/{s3}?{presigned}" 874 signed_urls.append(signed_url) 875 876 return signed_urls 877 878 def get_presigned_s3_public_url( 879 self, 880 s3_object: S3Object | str, 881 base_url: str | None = None, 882 ) -> str: 883 """ 884 Generate a presigned public URL for an S3 object. 885 If the S3 object is not signed yet, it will be signed first. 886 887 Args: 888 s3_object: S3 object to sign 889 base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL) 890 891 Returns: 892 Signed public URL 893 894 Example: 895 >>> s3_obj = S3Object(s3="/path/to/file.txt") 896 >>> url = client.get_presigned_s3_public_url(s3_obj) 897 """ 898 urls = self.get_presigned_s3_public_urls([s3_object], base_url) 899 return urls[0] 900 901 def _get_public_base_url(self) -> str: 902 """Get the public base URL from environment or default to localhost""" 903 return os.environ.get("WM_BASE_URL", "http://localhost:3000") 904 905 def __boto3_connection_settings(self, s3_resource) -> Boto3ConnectionSettings: 906 endpoint_url_prefix = "https://" if s3_resource["useSSL"] else "http://" 907 return Boto3ConnectionSettings( 908 { 909 "endpoint_url": "{}{}".format( 910 endpoint_url_prefix, s3_resource["endPoint"] 911 ), 912 "region_name": s3_resource["region"], 913 "use_ssl": s3_resource["useSSL"], 914 "aws_access_key_id": s3_resource["accessKey"], 915 "aws_secret_access_key": s3_resource["secretKey"], 916 # no need for path_style here as boto3 is clever enough to determine which one to use 917 } 918 ) 919 920 def whoami(self) -> dict: 921 return self.get("/users/whoami").json() 922 923 @property 924 def user(self) -> dict: 925 return self.whoami() 926 927 @property 928 def state_path(self) -> str: 929 state_path = os.environ.get( 930 "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH") 931 ) 932 if state_path is None: 933 raise Exception("State path not found") 934 return state_path 935 936 @property 937 def state(self) -> Any: 938 return self.get_resource(path=self.state_path, none_if_undefined=True) 939 940 @state.setter 941 def state(self, value: Any) -> None: 942 self.set_state(value) 943 944 @staticmethod 945 def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None: 946 """ 947 Set the state in the shared folder using pickle 948 """ 949 import pickle 950 951 with open(f"/shared/{path}", "wb") as handle: 952 pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL) 953 954 @staticmethod 955 def get_shared_state_pickle(path: str = "state.pickle") -> Any: 956 """ 957 Get the state in the shared folder using pickle 958 """ 959 import pickle 960 961 with open(f"/shared/{path}", "rb") as handle: 962 return pickle.load(handle) 963 964 @staticmethod 965 def set_shared_state(value: Any, path: str = "state.json") -> None: 966 """ 967 Set the state in the shared folder using pickle 968 """ 969 import json 970 971 with open(f"/shared/{path}", "w", encoding="utf-8") as f: 972 json.dump(value, f, ensure_ascii=False, indent=4) 973 974 @staticmethod 975 def get_shared_state(path: str = "state.json") -> None: 976 """ 977 Get the state in the shared folder using pickle 978 """ 979 import json 980 981 with open(f"/shared/{path}", "r", encoding="utf-8") as f: 982 return json.load(f) 983 984 def get_resume_urls(self, approver: str = None) -> dict: 985 nonce = random.randint(0, 1000000000) 986 job_id = os.environ.get("WM_JOB_ID") or "NO_ID" 987 return self.get( 988 f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}", 989 params={"approver": approver}, 990 ).json() 991 992 def request_interactive_slack_approval( 993 self, 994 slack_resource_path: str, 995 channel_id: str, 996 message: str = None, 997 approver: str = None, 998 default_args_json: dict = None, 999 dynamic_enums_json: dict = None, 1000 ) -> None: 1001 """ 1002 Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields. 1003 1004 **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. 1005 Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form 1006 1007 :param slack_resource_path: The path to the Slack resource in Windmill. 1008 :type slack_resource_path: str 1009 :param channel_id: The Slack channel ID where the approval request will be sent. 1010 :type channel_id: str 1011 :param message: Optional custom message to include in the Slack approval request. 1012 :type message: str, optional 1013 :param approver: Optional user ID or name of the approver for the request. 1014 :type approver: str, optional 1015 :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields. 1016 :type default_args_json: dict, optional 1017 :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields. 1018 :type dynamic_enums_json: dict, optional 1019 1020 :raises Exception: If the function is not called within a flow or flow preview. 1021 :raises Exception: If the required flow job or flow step environment variables are not set. 1022 1023 :return: None 1024 1025 **Usage Example:** 1026 >>> client.request_interactive_slack_approval( 1027 ... slack_resource_path="/u/alex/my_slack_resource", 1028 ... channel_id="admins-slack-channel", 1029 ... message="Please approve this request", 1030 ... approver="approver123", 1031 ... default_args_json={"key1": "value1", "key2": 42}, 1032 ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, 1033 ... ) 1034 1035 **Notes:** 1036 - This function must be executed within a Windmill flow or flow preview. 1037 - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context. 1038 """ 1039 workspace = self.workspace 1040 flow_job_id = os.environ.get("WM_FLOW_JOB_ID") 1041 1042 if not flow_job_id: 1043 raise Exception( 1044 "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview." 1045 ) 1046 1047 # Only include non-empty parameters 1048 params = {} 1049 if message: 1050 params["message"] = message 1051 if approver: 1052 params["approver"] = approver 1053 if slack_resource_path: 1054 params["slack_resource_path"] = slack_resource_path 1055 if channel_id: 1056 params["channel_id"] = channel_id 1057 if os.environ.get("WM_FLOW_STEP_ID"): 1058 params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID") 1059 if default_args_json: 1060 params["default_args_json"] = json.dumps(default_args_json) 1061 if dynamic_enums_json: 1062 params["dynamic_enums_json"] = json.dumps(dynamic_enums_json) 1063 1064 self.get( 1065 f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}", 1066 params=params, 1067 ) 1068 1069 def username_to_email(self, username: str) -> str: 1070 """ 1071 Get email from workspace username 1072 This method is particularly useful for apps that require the email address of the viewer. 1073 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 1074 """ 1075 return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text 1076 1077 def send_teams_message( 1078 self, 1079 conversation_id: str, 1080 text: str, 1081 success: bool = True, 1082 card_block: dict = None, 1083 ): 1084 """ 1085 Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message 1086 """ 1087 return self.post( 1088 f"/teams/activities", 1089 json={ 1090 "conversation_id": conversation_id, 1091 "text": text, 1092 "success": success, 1093 "card_block": card_block, 1094 }, 1095 ) 1096 1097 def datatable(self, name: str = "main"): 1098 return DataTableClient(self, name) 1099 1100 def ducklake(self, name: str = "main"): 1101 return DucklakeClient(self, name) 1102 1103 1104 1105def init_global_client(f): 1106 @functools.wraps(f) 1107 def wrapper(*args, **kwargs): 1108 global _client 1109 if _client is None: 1110 _client = Windmill() 1111 return f(*args, **kwargs) 1112 1113 return wrapper 1114 1115 1116def deprecate(in_favor_of: str): 1117 def decorator(f): 1118 @functools.wraps(f) 1119 def wrapper(*args, **kwargs): 1120 warnings.warn( 1121 ( 1122 f"The '{f.__name__}' method is deprecated and may be removed in the future. " 1123 f"Consider {in_favor_of}" 1124 ), 1125 DeprecationWarning, 1126 ) 1127 return f(*args, **kwargs) 1128 1129 return wrapper 1130 1131 return decorator 1132 1133 1134@init_global_client 1135def get_workspace() -> str: 1136 return _client.workspace 1137 1138 1139@init_global_client 1140def get_root_job_id(job_id: str | None = None) -> str: 1141 return _client.get_root_job_id(job_id) 1142 1143 1144@init_global_client 1145@deprecate("Windmill().version") 1146def get_version() -> str: 1147 return _client.version 1148 1149 1150@init_global_client 1151def run_script_async( 1152 hash_or_path: str, 1153 args: Dict[str, Any] = None, 1154 scheduled_in_secs: int = None, 1155) -> str: 1156 is_path = "/" in hash_or_path 1157 hash_ = None if is_path else hash_or_path 1158 path = hash_or_path if is_path else None 1159 return _client.run_script_async( 1160 hash_=hash_, 1161 path=path, 1162 args=args, 1163 scheduled_in_secs=scheduled_in_secs, 1164 ) 1165 1166 1167@init_global_client 1168def run_flow_async( 1169 path: str, 1170 args: Dict[str, Any] = None, 1171 scheduled_in_secs: int = None, 1172 # can only be set to false if this the job will be fully await and not concurrent with any other job 1173 # as otherwise the child flow and its own child will store their state in the parent job which will 1174 # lead to incorrectness and failures 1175 do_not_track_in_parent: bool = True, 1176) -> str: 1177 return _client.run_flow_async( 1178 path=path, 1179 args=args, 1180 scheduled_in_secs=scheduled_in_secs, 1181 do_not_track_in_parent=do_not_track_in_parent, 1182 ) 1183 1184 1185@init_global_client 1186def run_script_sync( 1187 hash: str, 1188 args: Dict[str, Any] = None, 1189 verbose: bool = False, 1190 assert_result_is_not_none: bool = True, 1191 cleanup: bool = True, 1192 timeout: dt.timedelta = None, 1193) -> Any: 1194 return _client.run_script( 1195 hash_=hash, 1196 args=args, 1197 verbose=verbose, 1198 assert_result_is_not_none=assert_result_is_not_none, 1199 cleanup=cleanup, 1200 timeout=timeout, 1201 ) 1202 1203 1204@init_global_client 1205def run_script_by_path_async( 1206 path: str, 1207 args: Dict[str, Any] = None, 1208 scheduled_in_secs: Union[None, int] = None, 1209) -> str: 1210 return _client.run_script_by_path_async( 1211 path=path, 1212 args=args, 1213 scheduled_in_secs=scheduled_in_secs, 1214 ) 1215 1216 1217@init_global_client 1218def run_script_by_hash_async( 1219 hash_: str, 1220 args: Dict[str, Any] = None, 1221 scheduled_in_secs: Union[None, int] = None, 1222) -> str: 1223 return _client.run_script_by_hash_async( 1224 hash_=hash_, 1225 args=args, 1226 scheduled_in_secs=scheduled_in_secs, 1227 ) 1228 1229 1230@init_global_client 1231def run_script_by_path_sync( 1232 path: str, 1233 args: Dict[str, Any] = None, 1234 verbose: bool = False, 1235 assert_result_is_not_none: bool = True, 1236 cleanup: bool = True, 1237 timeout: dt.timedelta = None, 1238) -> Any: 1239 return _client.run_script( 1240 path=path, 1241 args=args, 1242 verbose=verbose, 1243 assert_result_is_not_none=assert_result_is_not_none, 1244 cleanup=cleanup, 1245 timeout=timeout, 1246 ) 1247 1248 1249@init_global_client 1250def get_id_token(audience: str) -> str: 1251 """ 1252 Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc. 1253 """ 1254 return _client.get_id_token(audience) 1255 1256 1257@init_global_client 1258def get_job_status(job_id: str) -> JobStatus: 1259 return _client.get_job_status(job_id) 1260 1261 1262@init_global_client 1263def get_result(job_id: str, assert_result_is_not_none=True) -> Dict[str, Any]: 1264 return _client.get_result( 1265 job_id=job_id, assert_result_is_not_none=assert_result_is_not_none 1266 ) 1267 1268 1269@init_global_client 1270def duckdb_connection_settings(s3_resource_path: str = "") -> DuckDbConnectionSettings: 1271 """ 1272 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1273 initiate an S3 connection from DuckDB 1274 """ 1275 return _client.get_duckdb_connection_settings(s3_resource_path) 1276 1277 1278@init_global_client 1279def polars_connection_settings(s3_resource_path: str = "") -> PolarsConnectionSettings: 1280 """ 1281 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1282 initiate an S3 connection from Polars 1283 """ 1284 return _client.get_polars_connection_settings(s3_resource_path) 1285 1286 1287@init_global_client 1288def boto3_connection_settings(s3_resource_path: str = "") -> Boto3ConnectionSettings: 1289 """ 1290 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1291 initiate an S3 connection using boto3 1292 """ 1293 return _client.get_boto3_connection_settings(s3_resource_path) 1294 1295 1296@init_global_client 1297def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None = None) -> bytes: 1298 """ 1299 Load the entire content of a file stored in S3 as bytes 1300 """ 1301 return _client.load_s3_file( 1302 s3object, s3_resource_path if s3_resource_path != "" else None 1303 ) 1304 1305 1306@init_global_client 1307def load_s3_file_reader( 1308 s3object: S3Object | str, s3_resource_path: str | None = None 1309) -> BufferedReader: 1310 """ 1311 Load the content of a file stored in S3 1312 """ 1313 return _client.load_s3_file_reader( 1314 s3object, s3_resource_path if s3_resource_path != "" else None 1315 ) 1316 1317 1318@init_global_client 1319def write_s3_file( 1320 s3object: S3Object | str | None, 1321 file_content: BufferedReader | bytes, 1322 s3_resource_path: str | None = None, 1323 content_type: str | None = None, 1324 content_disposition: str | None = None, 1325) -> S3Object: 1326 """ 1327 Upload a file to S3 1328 1329 Content type will be automatically guessed from path extension if left empty 1330 1331 See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition 1332 and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type 1333 1334 """ 1335 return _client.write_s3_file( 1336 s3object, 1337 file_content, 1338 s3_resource_path if s3_resource_path != "" else None, 1339 content_type, 1340 content_disposition, 1341 ) 1342 1343 1344@init_global_client 1345def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]: 1346 """ 1347 Sign S3 objects to be used by anonymous users in public apps 1348 Returns a list of signed s3 tokens 1349 """ 1350 return _client.sign_s3_objects(s3_objects) 1351 1352 1353@init_global_client 1354def sign_s3_object(s3_object: S3Object| str) -> S3Object: 1355 """ 1356 Sign S3 object to be used by anonymous users in public apps 1357 Returns a signed s3 object 1358 """ 1359 return _client.sign_s3_object(s3_object) 1360 1361 1362@init_global_client 1363def get_presigned_s3_public_urls( 1364 s3_objects: list[S3Object | str], 1365 base_url: str | None = None, 1366) -> list[str]: 1367 """ 1368 Generate presigned public URLs for an array of S3 objects. 1369 If an S3 object is not signed yet, it will be signed first. 1370 1371 Args: 1372 s3_objects: List of S3 objects to sign 1373 base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL) 1374 1375 Returns: 1376 List of signed public URLs 1377 1378 Example: 1379 >>> import wmill 1380 >>> from wmill import S3Object 1381 >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] 1382 >>> urls = wmill.get_presigned_s3_public_urls(s3_objs) 1383 """ 1384 return _client.get_presigned_s3_public_urls(s3_objects, base_url) 1385 1386 1387@init_global_client 1388def get_presigned_s3_public_url( 1389 s3_object: S3Object | str, 1390 base_url: str | None = None, 1391) -> str: 1392 """ 1393 Generate a presigned public URL for an S3 object. 1394 If the S3 object is not signed yet, it will be signed first. 1395 1396 Args: 1397 s3_object: S3 object to sign 1398 base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL) 1399 1400 Returns: 1401 Signed public URL 1402 1403 Example: 1404 >>> import wmill 1405 >>> from wmill import S3Object 1406 >>> s3_obj = S3Object(s3="/path/to/file.txt") 1407 >>> url = wmill.get_presigned_s3_public_url(s3_obj) 1408 """ 1409 return _client.get_presigned_s3_public_url(s3_object, base_url) 1410 1411 1412@init_global_client 1413def whoami() -> dict: 1414 """ 1415 Returns the current user 1416 """ 1417 return _client.user 1418 1419 1420@init_global_client 1421@deprecate("Windmill().state") 1422def get_state() -> Any: 1423 """ 1424 Get the state 1425 """ 1426 return _client.state 1427 1428 1429@init_global_client 1430def get_resource( 1431 path: str, 1432 none_if_undefined: bool = False, 1433) -> dict | None: 1434 """Get resource from Windmill""" 1435 return _client.get_resource(path, none_if_undefined) 1436 1437 1438@init_global_client 1439def set_resource(path: str, value: Any, resource_type: str = "any") -> None: 1440 """ 1441 Set the resource at a given path as a string, creating it if it does not exist 1442 """ 1443 return _client.set_resource(value=value, path=path, resource_type=resource_type) 1444 1445 1446@init_global_client 1447def list_resources( 1448 resource_type: str = None, 1449 page: int = None, 1450 per_page: int = None, 1451) -> list[dict]: 1452 """List resources from Windmill workspace. 1453 1454 Args: 1455 resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") 1456 page: Optional page number for pagination 1457 per_page: Optional number of results per page 1458 1459 Returns: 1460 List of resource dictionaries 1461 1462 Example: 1463 >>> # Get all resources 1464 >>> all_resources = wmill.list_resources() 1465 1466 >>> # Get only PostgreSQL resources 1467 >>> pg_resources = wmill.list_resources(resource_type="postgresql") 1468 """ 1469 return _client.list_resources( 1470 resource_type=resource_type, 1471 page=page, 1472 per_page=per_page, 1473 ) 1474 1475 1476@init_global_client 1477def set_state(value: Any) -> None: 1478 """ 1479 Set the state 1480 """ 1481 return _client.set_state(value) 1482 1483 1484@init_global_client 1485def set_progress(value: int, job_id: Optional[str] = None) -> None: 1486 """ 1487 Set the progress 1488 """ 1489 return _client.set_progress(value, job_id) 1490 1491 1492@init_global_client 1493def get_progress(job_id: Optional[str] = None) -> Any: 1494 """ 1495 Get the progress 1496 """ 1497 return _client.get_progress(job_id) 1498 1499 1500def set_shared_state_pickle(value: Any, path="state.pickle") -> None: 1501 """ 1502 Set the state in the shared folder using pickle 1503 """ 1504 return Windmill.set_shared_state_pickle(value=value, path=path) 1505 1506 1507@deprecate("Windmill.get_shared_state_pickle(...)") 1508def get_shared_state_pickle(path="state.pickle") -> Any: 1509 """ 1510 Get the state in the shared folder using pickle 1511 """ 1512 return Windmill.get_shared_state_pickle(path=path) 1513 1514 1515def set_shared_state(value: Any, path="state.json") -> None: 1516 """ 1517 Set the state in the shared folder using pickle 1518 """ 1519 return Windmill.set_shared_state(value=value, path=path) 1520 1521 1522def get_shared_state(path="state.json") -> None: 1523 """ 1524 Get the state in the shared folder using pickle 1525 """ 1526 return Windmill.get_shared_state(path=path) 1527 1528 1529@init_global_client 1530def get_variable(path: str) -> str: 1531 """ 1532 Returns the variable at a given path as a string 1533 """ 1534 return _client.get_variable(path) 1535 1536 1537@init_global_client 1538def set_variable(path: str, value: str, is_secret: bool = False) -> None: 1539 """ 1540 Set the variable at a given path as a string, creating it if it does not exist 1541 """ 1542 return _client.set_variable(path, value, is_secret) 1543 1544 1545@init_global_client 1546def get_flow_user_state(key: str) -> Any: 1547 """ 1548 Get the user state of a flow at a given key 1549 """ 1550 return _client.get_flow_user_state(key) 1551 1552 1553@init_global_client 1554def set_flow_user_state(key: str, value: Any) -> None: 1555 """ 1556 Set the user state of a flow at a given key 1557 """ 1558 return _client.set_flow_user_state(key, value) 1559 1560 1561@init_global_client 1562def get_state_path() -> str: 1563 return _client.state_path 1564 1565 1566@init_global_client 1567def get_resume_urls(approver: str = None) -> dict: 1568 return _client.get_resume_urls(approver) 1569 1570 1571@init_global_client 1572def request_interactive_slack_approval( 1573 slack_resource_path: str, 1574 channel_id: str, 1575 message: str = None, 1576 approver: str = None, 1577 default_args_json: dict = None, 1578 dynamic_enums_json: dict = None, 1579) -> None: 1580 return _client.request_interactive_slack_approval( 1581 slack_resource_path=slack_resource_path, 1582 channel_id=channel_id, 1583 message=message, 1584 approver=approver, 1585 default_args_json=default_args_json, 1586 dynamic_enums_json=dynamic_enums_json, 1587 ) 1588 1589 1590@init_global_client 1591def send_teams_message( 1592 conversation_id: str, text: str, success: bool, card_block: dict = None 1593): 1594 return _client.send_teams_message(conversation_id, text, success, card_block) 1595 1596 1597@init_global_client 1598def cancel_job(job_id: str, reason: str = None) -> str: 1599 """Cancel a specific job by ID. 1600 1601 Args: 1602 job_id: UUID of the job to cancel 1603 reason: Optional reason for cancellation 1604 1605 Returns: 1606 Response message from the cancel endpoint 1607 """ 1608 return _client.cancel_job(job_id, reason) 1609 1610 1611@init_global_client 1612def cancel_running() -> dict: 1613 """Cancel currently running executions of the same script.""" 1614 return _client.cancel_running() 1615 1616 1617@init_global_client 1618def run_script( 1619 path: str = None, 1620 hash_: str = None, 1621 args: dict = None, 1622 timeout: dt.timedelta | int | float = None, 1623 verbose: bool = False, 1624 cleanup: bool = True, 1625 assert_result_is_not_none: bool = True, 1626) -> Any: 1627 """Run script synchronously and return its result. 1628 1629 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 1630 """ 1631 return _client.run_script( 1632 path=path, 1633 hash_=hash_, 1634 args=args, 1635 verbose=verbose, 1636 assert_result_is_not_none=assert_result_is_not_none, 1637 cleanup=cleanup, 1638 timeout=timeout, 1639 ) 1640 1641 1642@init_global_client 1643def run_script_by_path( 1644 path: str, 1645 args: dict = None, 1646 timeout: dt.timedelta | int | float = None, 1647 verbose: bool = False, 1648 cleanup: bool = True, 1649 assert_result_is_not_none: bool = True, 1650) -> Any: 1651 """Run script by path synchronously and return its result.""" 1652 return _client.run_script_by_path( 1653 path=path, 1654 args=args, 1655 verbose=verbose, 1656 assert_result_is_not_none=assert_result_is_not_none, 1657 cleanup=cleanup, 1658 timeout=timeout, 1659 ) 1660 1661 1662@init_global_client 1663def run_script_by_hash( 1664 hash_: str, 1665 args: dict = None, 1666 timeout: dt.timedelta | int | float = None, 1667 verbose: bool = False, 1668 cleanup: bool = True, 1669 assert_result_is_not_none: bool = True, 1670) -> Any: 1671 """Run script by hash synchronously and return its result.""" 1672 return _client.run_script_by_hash( 1673 hash_=hash_, 1674 args=args, 1675 verbose=verbose, 1676 assert_result_is_not_none=assert_result_is_not_none, 1677 cleanup=cleanup, 1678 timeout=timeout, 1679 ) 1680 1681@init_global_client 1682def run_inline_script_preview( 1683 content: str, 1684 language: str, 1685 args: dict = None, 1686) -> Any: 1687 """Run a script on the current worker without creating a job""" 1688 return _client.run_inline_script_preview( 1689 content=content, 1690 language=language, 1691 args=args, 1692 ) 1693 1694@init_global_client 1695def username_to_email(username: str) -> str: 1696 """ 1697 Get email from workspace username 1698 This method is particularly useful for apps that require the email address of the viewer. 1699 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 1700 """ 1701 return _client.username_to_email(username) 1702 1703 1704@init_global_client 1705def datatable(name: str = "main") -> DataTableClient: 1706 return _client.datatable(name) 1707 1708@init_global_client 1709def ducklake(name: str = "main") -> DucklakeClient: 1710 return _client.ducklake(name) 1711 1712def task(*args, **kwargs): 1713 from inspect import signature 1714 1715 def f(func, tag: str | None = None): 1716 if ( 1717 os.environ.get("WM_JOB_ID") is None 1718 or os.environ.get("MAIN_OVERRIDE") == func.__name__ 1719 ): 1720 1721 def inner(*args, **kwargs): 1722 return func(*args, **kwargs) 1723 1724 return inner 1725 else: 1726 1727 def inner(*args, **kwargs): 1728 global _client 1729 if _client is None: 1730 _client = Windmill() 1731 w_id = os.environ.get("WM_WORKSPACE") 1732 job_id = os.environ.get("WM_JOB_ID") 1733 f_name = func.__name__ 1734 json = kwargs 1735 params = list(signature(func).parameters) 1736 for i, arg in enumerate(args): 1737 if i < len(params): 1738 p = params[i] 1739 key = p 1740 if key not in kwargs: 1741 json[key] = arg 1742 1743 params = {} 1744 if tag is not None: 1745 params["tag"] = tag 1746 w_as_code_response = _client.post( 1747 f"/w/{w_id}/jobs/run/workflow_as_code/{job_id}/{f_name}", 1748 json={"args": json}, 1749 params=params, 1750 ) 1751 job_id = w_as_code_response.text 1752 print(f"Executing task {func.__name__} on job {job_id}") 1753 job_result = _client.wait_job(job_id) 1754 print(f"Task {func.__name__} ({job_id}) completed") 1755 return job_result 1756 1757 return inner 1758 1759 if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): 1760 return f(args[0], None) 1761 else: 1762 return lambda x: f(x, kwargs.get("tag")) 1763 1764def parse_resource_syntax(s: str) -> Optional[str]: 1765 """Parse resource syntax from string.""" 1766 if s is None: 1767 return None 1768 if s.startswith("$res:"): 1769 return s[5:] 1770 if s.startswith("res://"): 1771 return s[6:] 1772 return None 1773 1774def parse_s3_object(s3_object: S3Object | str) -> S3Object: 1775 """Parse S3 object from string or S3Object format.""" 1776 if isinstance(s3_object, str): 1777 match = re.match(r'^s3://([^/]*)/(.*)$', s3_object) 1778 if match: 1779 return S3Object(s3=match.group(2) or "", storage=match.group(1) or None) 1780 return S3Object(s3="") 1781 else: 1782 return s3_object 1783 1784 1785 1786def parse_variable_syntax(s: str) -> Optional[str]: 1787 """Parse variable syntax from string.""" 1788 if s.startswith("var://"): 1789 return s[6:] 1790 return None 1791 1792 1793def append_to_result_stream(text: str) -> None: 1794 """Append a text to the result stream. 1795 1796 Args: 1797 text: text to append to the result stream 1798 """ 1799 print("WM_STREAM: {}".format(text.replace(chr(10), '\\n'))) 1800 1801def stream_result(stream) -> None: 1802 """Stream to the result stream. 1803 1804 Args: 1805 stream: stream to stream to the result stream 1806 """ 1807 for text in stream: 1808 append_to_result_stream(text) 1809 1810class DataTableClient: 1811 def __init__(self, client: Windmill, name: str): 1812 self.client = client 1813 self.name = name 1814 def query(self, sql: str, *args): 1815 args_dict = {} 1816 args_def = "" 1817 for i, arg in enumerate(args): 1818 args_dict[f"arg{i+1}"] = arg 1819 args_def += f"-- ${i+1} arg{i+1}\n" 1820 sql = args_def + sql 1821 return SqlQuery( 1822 sql, 1823 lambda sql: self.client.run_inline_script_preview( 1824 content=sql, 1825 language="postgresql", 1826 args={"database": f"datatable://{self.name}", **args_dict}, 1827 ) 1828 ) 1829 1830class DucklakeClient: 1831 def __init__(self, client: Windmill, name: str): 1832 self.client = client 1833 self.name = name 1834 def query(self, sql: str, **kwargs): 1835 args_dict = {} 1836 args_def = "" 1837 for key, value in kwargs.items(): 1838 args_dict[key] = value 1839 args_def += f"-- ${key} ({infer_sql_type(value)})\n" 1840 attach = f"ATTACH 'ducklake://{self.name}' AS dl;USE dl;\n" 1841 sql = args_def + attach + sql 1842 return SqlQuery( 1843 sql, 1844 lambda sql: self.client.run_inline_script_preview( 1845 content=sql, 1846 language="duckdb", 1847 args=args_dict, 1848 ) 1849 ) 1850 1851class SqlQuery: 1852 def __init__(self, sql: str, fetch_fn): 1853 self.sql = sql 1854 self.fetch_fn = fetch_fn 1855 def fetch(self, result_collection: str | None = None): 1856 sql = self.sql 1857 if result_collection is not None: 1858 sql = f'-- result_collection={result_collection}\n{sql}' 1859 return self.fetch_fn(sql) 1860 def fetch_one(self): 1861 return self.fetch(result_collection="last_statement_first_row") 1862 1863def infer_sql_type(value) -> str: 1864 """ 1865 DuckDB executor requires explicit argument types at declaration 1866 These types exist in both DuckDB and Postgres 1867 Check that the types exist if you plan to extend this function for other SQL engines. 1868 """ 1869 if isinstance(value, bool): 1870 # Check bool before int since bool is a subclass of int in Python 1871 return "BOOLEAN" 1872 elif isinstance(value, int): 1873 return "BIGINT" 1874 elif isinstance(value, float): 1875 return "DOUBLE PRECISION" 1876 elif value is None: 1877 return "TEXT" 1878 elif isinstance(value, str): 1879 return "TEXT" 1880 elif isinstance(value, dict) or isinstance(value, list): 1881 return "JSON" 1882 else: 1883 return "TEXT"
35class Windmill: 36 def __init__(self, base_url=None, token=None, workspace=None, verify=True): 37 base = ( 38 base_url 39 or os.environ.get("BASE_INTERNAL_URL") 40 or os.environ.get("WM_BASE_URL") 41 ) 42 43 self.base_url = f"{base}/api" 44 self.token = token or os.environ.get("WM_TOKEN") 45 self.headers = { 46 "Content-Type": "application/json", 47 "Authorization": f"Bearer {self.token}", 48 } 49 self.verify = verify 50 self.client = self.get_client() 51 self.workspace = workspace or os.environ.get("WM_WORKSPACE") 52 self.path = os.environ.get("WM_JOB_PATH") 53 54 self.mocked_api = self.get_mocked_api() 55 56 assert self.workspace, ( 57 f"workspace required as an argument or as WM_WORKSPACE environment variable" 58 ) 59 60 def get_mocked_api(self) -> Optional[dict]: 61 mocked_path = os.environ.get("WM_MOCKED_API_FILE") 62 if not mocked_path: 63 return None 64 logger.info("Using mocked API from %s", mocked_path) 65 mocked_api = {"variables": {}, "resources": {}} 66 try: 67 with open(mocked_path, "r") as f: 68 incoming_mocked_api = json.load(f) 69 mocked_api = {**mocked_api, **incoming_mocked_api} 70 except Exception as e: 71 logger.warning( 72 "Error parsing mocked API file at path %s Using empty mocked API.", 73 mocked_path, 74 ) 75 logger.debug(e) 76 return mocked_api 77 78 def get_client(self) -> httpx.Client: 79 return httpx.Client( 80 base_url=self.base_url, 81 headers=self.headers, 82 verify=self.verify, 83 ) 84 85 def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 86 endpoint = endpoint.lstrip("/") 87 resp = self.client.get(f"/{endpoint}", **kwargs) 88 if raise_for_status: 89 try: 90 resp.raise_for_status() 91 except httpx.HTTPStatusError as err: 92 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 93 logger.error(error) 94 raise Exception(error) 95 return resp 96 97 def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 98 endpoint = endpoint.lstrip("/") 99 resp = self.client.post(f"/{endpoint}", **kwargs) 100 if raise_for_status: 101 try: 102 resp.raise_for_status() 103 except httpx.HTTPStatusError as err: 104 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 105 logger.error(error) 106 raise Exception(error) 107 return resp 108 109 def create_token(self, duration=dt.timedelta(days=1)) -> str: 110 endpoint = "/users/tokens/create" 111 payload = { 112 "label": f"refresh {time.time()}", 113 "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"), 114 } 115 return self.post(endpoint, json=payload).text 116 117 def run_script_async( 118 self, 119 path: str = None, 120 hash_: str = None, 121 args: dict = None, 122 scheduled_in_secs: int = None, 123 ) -> str: 124 """Create a script job and return its job id. 125 126 .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead. 127 """ 128 logging.warning( 129 "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.", 130 ) 131 assert not (path and hash_), "path and hash_ are mutually exclusive" 132 return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs) 133 134 def _run_script_async_internal( 135 self, 136 path: str = None, 137 hash_: str = None, 138 args: dict = None, 139 scheduled_in_secs: int = None, 140 ) -> str: 141 """Internal helper for running scripts asynchronously.""" 142 args = args or {} 143 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 144 if os.environ.get("WM_JOB_ID"): 145 params["parent_job"] = os.environ.get("WM_JOB_ID") 146 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 147 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 148 149 if path: 150 endpoint = f"/w/{self.workspace}/jobs/run/p/{path}" 151 elif hash_: 152 endpoint = f"/w/{self.workspace}/jobs/run/h/{hash_}" 153 else: 154 raise Exception("path or hash_ must be provided") 155 156 return self.post(endpoint, json=args, params=params).text 157 158 def run_script_by_path_async( 159 self, 160 path: str, 161 args: dict = None, 162 scheduled_in_secs: int = None, 163 ) -> str: 164 """Create a script job by path and return its job id.""" 165 return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs) 166 167 def run_script_by_hash_async( 168 self, 169 hash_: str, 170 args: dict = None, 171 scheduled_in_secs: int = None, 172 ) -> str: 173 """Create a script job by hash and return its job id.""" 174 return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs) 175 176 def run_flow_async( 177 self, 178 path: str, 179 args: dict = None, 180 scheduled_in_secs: int = None, 181 # can only be set to false if this the job will be fully await and not concurrent with any other job 182 # as otherwise the child flow and its own child will store their state in the parent job which will 183 # lead to incorrectness and failures 184 do_not_track_in_parent: bool = True, 185 ) -> str: 186 """Create a flow job and return its job id.""" 187 args = args or {} 188 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 189 if not do_not_track_in_parent: 190 if os.environ.get("WM_JOB_ID"): 191 params["parent_job"] = os.environ.get("WM_JOB_ID") 192 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 193 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 194 if path: 195 endpoint = f"/w/{self.workspace}/jobs/run/f/{path}" 196 else: 197 raise Exception("path must be provided") 198 return self.post(endpoint, json=args, params=params).text 199 200 def run_script( 201 self, 202 path: str = None, 203 hash_: str = None, 204 args: dict = None, 205 timeout: dt.timedelta | int | float | None = None, 206 verbose: bool = False, 207 cleanup: bool = True, 208 assert_result_is_not_none: bool = False, 209 ) -> Any: 210 """Run script synchronously and return its result. 211 212 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 213 """ 214 logging.warning( 215 "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.", 216 ) 217 assert not (path and hash_), "path and hash_ are mutually exclusive" 218 return self._run_script_internal( 219 path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose, 220 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 221 ) 222 223 def _run_script_internal( 224 self, 225 path: str = None, 226 hash_: str = None, 227 args: dict = None, 228 timeout: dt.timedelta | int | float | None = None, 229 verbose: bool = False, 230 cleanup: bool = True, 231 assert_result_is_not_none: bool = False, 232 ) -> Any: 233 """Internal helper for running scripts synchronously.""" 234 args = args or {} 235 236 if verbose: 237 if path: 238 logger.info(f"running `{path}` synchronously with {args = }") 239 elif hash_: 240 logger.info(f"running script with hash `{hash_}` synchronously with {args = }") 241 242 if isinstance(timeout, dt.timedelta): 243 timeout = timeout.total_seconds() 244 245 job_id = self._run_script_async_internal(path=path, hash_=hash_, args=args) 246 return self.wait_job( 247 job_id, timeout, verbose, cleanup, assert_result_is_not_none 248 ) 249 250 def run_script_by_path( 251 self, 252 path: str, 253 args: dict = None, 254 timeout: dt.timedelta | int | float | None = None, 255 verbose: bool = False, 256 cleanup: bool = True, 257 assert_result_is_not_none: bool = False, 258 ) -> Any: 259 """Run script by path synchronously and return its result.""" 260 return self._run_script_internal( 261 path=path, args=args, timeout=timeout, verbose=verbose, 262 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 263 ) 264 265 def run_script_by_hash( 266 self, 267 hash_: str, 268 args: dict = None, 269 timeout: dt.timedelta | int | float | None = None, 270 verbose: bool = False, 271 cleanup: bool = True, 272 assert_result_is_not_none: bool = False, 273 ) -> Any: 274 """Run script by hash synchronously and return its result.""" 275 return self._run_script_internal( 276 hash_=hash_, args=args, timeout=timeout, verbose=verbose, 277 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 278 ) 279 280 def run_inline_script_preview( 281 self, 282 content: str, 283 language: str, 284 args: dict = None, 285 ) -> Any: 286 """Run a script on the current worker without creating a job""" 287 endpoint = f"/w/{self.workspace}/jobs/run_inline/preview" 288 body = { 289 "content": content, 290 "language": language, 291 "args": args or {}, 292 } 293 return self.post(endpoint, json=body).text 294 295 def wait_job( 296 self, 297 job_id, 298 timeout: dt.timedelta | int | float | None = None, 299 verbose: bool = False, 300 cleanup: bool = True, 301 assert_result_is_not_none: bool = False, 302 ): 303 def cancel_job(): 304 logger.warning(f"cancelling job: {job_id}") 305 self.post( 306 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 307 json={"reason": "parent script cancelled"}, 308 ).raise_for_status() 309 310 if cleanup: 311 atexit.register(cancel_job) 312 313 start_time = time.time() 314 315 if isinstance(timeout, dt.timedelta): 316 timeout = timeout.total_seconds() 317 318 while True: 319 result_res = self.get( 320 f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True 321 ).json() 322 323 started = result_res["started"] 324 completed = result_res["completed"] 325 success = result_res["success"] 326 327 if not started and verbose: 328 logger.info(f"job {job_id} has not started yet") 329 330 if cleanup and completed: 331 atexit.unregister(cancel_job) 332 333 if completed: 334 result = result_res["result"] 335 if success: 336 if result is None and assert_result_is_not_none: 337 raise Exception("Result was none") 338 return result 339 else: 340 error = result["error"] 341 raise Exception(f"Job {job_id} was not successful: {str(error)}") 342 343 if timeout and ((time.time() - start_time) > timeout): 344 msg = "reached timeout" 345 logger.warning(msg) 346 self.post( 347 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 348 json={"reason": msg}, 349 ) 350 raise TimeoutError(msg) 351 if verbose: 352 logger.info(f"sleeping 0.5 seconds for {job_id = }") 353 354 time.sleep(0.5) 355 356 def cancel_job(self, job_id: str, reason: str = None) -> str: 357 """Cancel a specific job by ID. 358 359 Args: 360 job_id: UUID of the job to cancel 361 reason: Optional reason for cancellation 362 363 Returns: 364 Response message from the cancel endpoint 365 """ 366 logger.info(f"cancelling job: {job_id}") 367 368 payload = {"reason": reason or "cancelled via cancel_job method"} 369 370 response = self.post( 371 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 372 json=payload, 373 ) 374 375 return response.text 376 377 def cancel_running(self) -> dict: 378 """Cancel currently running executions of the same script.""" 379 logger.info("canceling running executions of this script") 380 381 jobs = self.get( 382 f"/w/{self.workspace}/jobs/list", 383 params={ 384 "running": "true", 385 "script_path_exact": self.path, 386 }, 387 ).json() 388 389 current_job_id = os.environ.get("WM_JOB_ID") 390 391 logger.debug(f"{current_job_id = }") 392 393 job_ids = [j["id"] for j in jobs if j["id"] != current_job_id] 394 395 if job_ids: 396 logger.info(f"cancelling the following job ids: {job_ids}") 397 else: 398 logger.info("no previous executions to cancel") 399 400 result = {} 401 402 for id_ in job_ids: 403 result[id_] = self.post( 404 f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}", 405 json={"reason": "killed by `cancel_running` method"}, 406 ) 407 408 return result 409 410 def get_job(self, job_id: str) -> dict: 411 return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json() 412 413 def get_root_job_id(self, job_id: str | None = None) -> dict: 414 job_id = job_id or os.environ.get("WM_JOB_ID") 415 return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json() 416 417 def get_id_token(self, audience: str, expires_in: int | None = None) -> str: 418 params = {} 419 if expires_in is not None: 420 params["expires_in"] = expires_in 421 return self.post(f"/w/{self.workspace}/oidc/token/{audience}", params=params).text 422 423 def get_job_status(self, job_id: str) -> JobStatus: 424 job = self.get_job(job_id) 425 job_type = job.get("type", "") 426 assert job_type, f"{job} is not a valid job" 427 if job_type.lower() == "completedjob": 428 return "COMPLETED" 429 if job.get("running"): 430 return "RUNNING" 431 return "WAITING" 432 433 def get_result( 434 self, 435 job_id: str, 436 assert_result_is_not_none: bool = True, 437 ) -> Any: 438 result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}") 439 result_text = result.text 440 if assert_result_is_not_none and result_text is None: 441 raise Exception(f"result is None for {job_id = }") 442 try: 443 return result.json() 444 except JSONDecodeError: 445 return result_text 446 447 def get_variable(self, path: str) -> str: 448 path = parse_variable_syntax(path) or path 449 if self.mocked_api is not None: 450 variables = self.mocked_api["variables"] 451 try: 452 result = variables[path] 453 return result 454 except KeyError: 455 logger.info( 456 f"MockedAPI present, but variable not found at {path}, falling back to real API" 457 ) 458 459 """Get variable from Windmill""" 460 return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json() 461 462 def set_variable(self, path: str, value: str, is_secret: bool = False) -> None: 463 path = parse_variable_syntax(path) or path 464 if self.mocked_api is not None: 465 self.mocked_api["variables"][path] = value 466 return 467 468 """Set variable from Windmill""" 469 # check if variable exists 470 r = self.get( 471 f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False 472 ) 473 if r.status_code == 404: 474 # create variable 475 self.post( 476 f"/w/{self.workspace}/variables/create", 477 json={ 478 "path": path, 479 "value": value, 480 "is_secret": is_secret, 481 "description": "", 482 }, 483 ) 484 else: 485 # update variable 486 self.post( 487 f"/w/{self.workspace}/variables/update/{path}", 488 json={"value": value}, 489 ) 490 491 def get_resource( 492 self, 493 path: str, 494 none_if_undefined: bool = False, 495 ) -> dict | None: 496 path = parse_resource_syntax(path) or path 497 if self.mocked_api is not None: 498 resources = self.mocked_api["resources"] 499 try: 500 result = resources[path] 501 return result 502 except KeyError: 503 # NOTE: should mocked_api respect `none_if_undefined`? 504 if none_if_undefined: 505 logger.info( 506 f"resource not found at ${path}, but none_if_undefined is True, so returning None" 507 ) 508 return None 509 logger.info( 510 f"MockedAPI present, but resource not found at ${path}, falling back to real API" 511 ) 512 513 """Get resource from Windmill""" 514 try: 515 return self.get( 516 f"/w/{self.workspace}/resources/get_value_interpolated/{path}" 517 ).json() 518 except Exception as e: 519 if none_if_undefined: 520 return None 521 logger.error(e) 522 raise e 523 524 def set_resource( 525 self, 526 value: Any, 527 path: str, 528 resource_type: str, 529 ): 530 path = parse_resource_syntax(path) or path 531 if self.mocked_api is not None: 532 self.mocked_api["resources"][path] = value 533 return 534 535 # check if resource exists 536 r = self.get( 537 f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False 538 ) 539 if r.status_code == 404: 540 # create resource 541 self.post( 542 f"/w/{self.workspace}/resources/create", 543 json={ 544 "path": path, 545 "value": value, 546 "resource_type": resource_type, 547 }, 548 ) 549 else: 550 # update resource 551 self.post( 552 f"/w/{self.workspace}/resources/update_value/{path}", 553 json={"value": value}, 554 ) 555 556 def list_resources( 557 self, 558 resource_type: str = None, 559 page: int = None, 560 per_page: int = None, 561 ) -> list[dict]: 562 """List resources from Windmill workspace. 563 564 Args: 565 resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") 566 page: Optional page number for pagination 567 per_page: Optional number of results per page 568 569 Returns: 570 List of resource dictionaries 571 """ 572 params = {} 573 if resource_type is not None: 574 params["resource_type"] = resource_type 575 if page is not None: 576 params["page"] = page 577 if per_page is not None: 578 params["per_page"] = per_page 579 580 return self.get( 581 f"/w/{self.workspace}/resources/list", 582 params=params if params else None, 583 ).json() 584 585 def set_state(self, value: Any): 586 self.set_resource(value, path=self.state_path, resource_type="state") 587 588 def set_progress(self, value: int, job_id: Optional[str] = None): 589 workspace = get_workspace() 590 flow_id = os.environ.get("WM_FLOW_JOB_ID") 591 job_id = job_id or os.environ.get("WM_JOB_ID") 592 593 if job_id != None: 594 job = self.get_job(job_id) 595 flow_id = job.get("parent_job") 596 597 self.post( 598 f"/w/{workspace}/job_metrics/set_progress/{job_id}", 599 json={ 600 "percent": value, 601 "flow_job_id": flow_id or None, 602 }, 603 ) 604 605 def get_progress(self, job_id: Optional[str] = None) -> Any: 606 workspace = get_workspace() 607 job_id = job_id or os.environ.get("WM_JOB_ID") 608 609 r = self.get( 610 f"/w/{workspace}/job_metrics/get_progress/{job_id}", 611 ) 612 if r.status_code == 404: 613 print(f"Job {job_id} does not exist") 614 return None 615 else: 616 return r.json() 617 618 def set_flow_user_state(self, key: str, value: Any) -> None: 619 """Set the user state of a flow at a given key""" 620 flow_id = self.get_root_job_id() 621 r = self.post( 622 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 623 json=value, 624 raise_for_status=False, 625 ) 626 if r.status_code == 404: 627 print(f"Job {flow_id} does not exist or is not a flow") 628 629 def get_flow_user_state(self, key: str) -> Any: 630 """Get the user state of a flow at a given key""" 631 flow_id = self.get_root_job_id() 632 r = self.get( 633 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 634 raise_for_status=False, 635 ) 636 if r.status_code == 404: 637 print(f"Job {flow_id} does not exist or is not a flow") 638 return None 639 else: 640 return r.json() 641 642 @property 643 def version(self): 644 return self.get("version").text 645 646 def get_duckdb_connection_settings( 647 self, 648 s3_resource_path: str = "", 649 ) -> DuckDbConnectionSettings | None: 650 """ 651 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 652 initiate an S3 connection from DuckDB 653 """ 654 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 655 try: 656 raw_obj = self.post( 657 f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings", 658 json={} 659 if s3_resource_path == "" 660 else {"s3_resource_path": s3_resource_path}, 661 ).json() 662 return DuckDbConnectionSettings(raw_obj) 663 except JSONDecodeError as e: 664 raise Exception( 665 "Could not generate DuckDB S3 connection settings from the provided resource" 666 ) from e 667 668 def get_polars_connection_settings( 669 self, 670 s3_resource_path: str = "", 671 ) -> PolarsConnectionSettings: 672 """ 673 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 674 initiate an S3 connection from Polars 675 """ 676 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 677 try: 678 raw_obj = self.post( 679 f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings", 680 json={} 681 if s3_resource_path == "" 682 else {"s3_resource_path": s3_resource_path}, 683 ).json() 684 return PolarsConnectionSettings(raw_obj) 685 except JSONDecodeError as e: 686 raise Exception( 687 "Could not generate Polars S3 connection settings from the provided resource" 688 ) from e 689 690 def get_boto3_connection_settings( 691 self, 692 s3_resource_path: str = "", 693 ) -> Boto3ConnectionSettings: 694 """ 695 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 696 initiate an S3 connection using boto3 697 """ 698 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 699 try: 700 s3_resource = self.post( 701 f"/w/{self.workspace}/job_helpers/v2/s3_resource_info", 702 json={} 703 if s3_resource_path == "" 704 else {"s3_resource_path": s3_resource_path}, 705 ).json() 706 return self.__boto3_connection_settings(s3_resource) 707 except JSONDecodeError as e: 708 raise Exception( 709 "Could not generate Boto3 S3 connection settings from the provided resource" 710 ) from e 711 712 def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes: 713 """ 714 Load a file from the workspace s3 bucket and returns its content as bytes. 715 716 '''python 717 from wmill import S3Object 718 719 s3_obj = S3Object(s3="/path/to/my_file.txt") 720 my_obj_content = client.load_s3_file(s3_obj) 721 file_content = my_obj_content.decode("utf-8") 722 ''' 723 """ 724 s3object = parse_s3_object(s3object) 725 with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 726 return file_reader.read() 727 728 def load_s3_file_reader( 729 self, s3object: S3Object | str, s3_resource_path: str | None 730 ) -> BufferedReader: 731 """ 732 Load a file from the workspace s3 bucket and returns the bytes stream. 733 734 '''python 735 from wmill import S3Object 736 737 s3_obj = S3Object(s3="/path/to/my_file.txt") 738 with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 739 print(file_reader.read()) 740 ''' 741 """ 742 s3object = parse_s3_object(s3object) 743 reader = S3BufferedReader( 744 f"{self.workspace}", 745 self.client, 746 s3object["s3"], 747 s3_resource_path, 748 s3object["storage"] if "storage" in s3object else None, 749 ) 750 return reader 751 752 def write_s3_file( 753 self, 754 s3object: S3Object | str | None, 755 file_content: BufferedReader | bytes, 756 s3_resource_path: str | None, 757 content_type: str | None = None, 758 content_disposition: str | None = None, 759 ) -> S3Object: 760 """ 761 Write a file to the workspace S3 bucket 762 763 '''python 764 from wmill import S3Object 765 766 s3_obj = S3Object(s3="/path/to/my_file.txt") 767 768 # for an in memory bytes array: 769 file_content = b'Hello Windmill!' 770 client.write_s3_file(s3_obj, file_content) 771 772 # for a file: 773 with open("my_file.txt", "rb") as my_file: 774 client.write_s3_file(s3_obj, my_file) 775 ''' 776 """ 777 s3object = parse_s3_object(s3object) 778 # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator 779 if isinstance(file_content, BufferedReader): 780 content_payload = bytes_generator(file_content) 781 elif isinstance(file_content, bytes): 782 content_payload = file_content 783 else: 784 raise Exception("Type of file_content not supported") 785 786 query_params = {} 787 if s3object is not None and s3object["s3"] != "": 788 query_params["file_key"] = s3object["s3"] 789 if s3_resource_path is not None and s3_resource_path != "": 790 query_params["s3_resource_path"] = s3_resource_path 791 if ( 792 s3object is not None 793 and "storage" in s3object 794 and s3object["storage"] is not None 795 ): 796 query_params["storage"] = s3object["storage"] 797 if content_type is not None: 798 query_params["content_type"] = content_type 799 if content_disposition is not None: 800 query_params["content_disposition"] = content_disposition 801 802 try: 803 # need a vanilla client b/c content-type is not application/json here 804 response = httpx.post( 805 f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file", 806 headers={ 807 "Authorization": f"Bearer {self.token}", 808 "Content-Type": "application/octet-stream", 809 }, 810 params=query_params, 811 content=content_payload, 812 verify=self.verify, 813 timeout=None, 814 ).json() 815 except Exception as e: 816 raise Exception("Could not write file to S3") from e 817 return S3Object(s3=response["file_key"]) 818 819 def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]: 820 return self.post( 821 f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))} 822 ).json() 823 824 def sign_s3_object(self, s3_object: S3Object | str) -> S3Object: 825 return self.post( 826 f"/w/{self.workspace}/apps/sign_s3_objects", 827 json={"s3_objects": [s3_object]}, 828 ).json()[0] 829 830 def get_presigned_s3_public_urls( 831 self, 832 s3_objects: list[S3Object | str], 833 base_url: str | None = None, 834 ) -> list[str]: 835 """ 836 Generate presigned public URLs for an array of S3 objects. 837 If an S3 object is not signed yet, it will be signed first. 838 839 Args: 840 s3_objects: List of S3 objects to sign 841 base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL) 842 843 Returns: 844 List of signed public URLs 845 846 Example: 847 >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] 848 >>> urls = client.get_presigned_s3_public_urls(s3_objs) 849 """ 850 base_url = base_url or self._get_public_base_url() 851 852 s3_objs = [parse_s3_object(s3_obj) for s3_obj in s3_objects] 853 854 # Sign all S3 objects that need to be signed in one go 855 s3_objs_to_sign: list[tuple[S3Object, int]] = [ 856 (s3_obj, index) 857 for index, s3_obj in enumerate(s3_objs) 858 if s3_obj.get("presigned") is None 859 ] 860 861 if s3_objs_to_sign: 862 print(f"Signing {len(s3_objs_to_sign)} S3 objects...") 863 signed_s3_objs = self.sign_s3_objects( 864 [s3_obj for s3_obj, _ in s3_objs_to_sign] 865 ) 866 for i, (_, original_index) in enumerate(s3_objs_to_sign): 867 s3_objs[original_index] = parse_s3_object(signed_s3_objs[i]) 868 869 signed_urls: list[str] = [] 870 for s3_obj in s3_objs: 871 s3 = s3_obj.get("s3", "") 872 presigned = s3_obj.get("presigned", "") 873 storage = s3_obj.get("storage", "_default_") 874 signed_url = f"{base_url}/api/w/{self.workspace}/s3_proxy/{storage}/{s3}?{presigned}" 875 signed_urls.append(signed_url) 876 877 return signed_urls 878 879 def get_presigned_s3_public_url( 880 self, 881 s3_object: S3Object | str, 882 base_url: str | None = None, 883 ) -> str: 884 """ 885 Generate a presigned public URL for an S3 object. 886 If the S3 object is not signed yet, it will be signed first. 887 888 Args: 889 s3_object: S3 object to sign 890 base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL) 891 892 Returns: 893 Signed public URL 894 895 Example: 896 >>> s3_obj = S3Object(s3="/path/to/file.txt") 897 >>> url = client.get_presigned_s3_public_url(s3_obj) 898 """ 899 urls = self.get_presigned_s3_public_urls([s3_object], base_url) 900 return urls[0] 901 902 def _get_public_base_url(self) -> str: 903 """Get the public base URL from environment or default to localhost""" 904 return os.environ.get("WM_BASE_URL", "http://localhost:3000") 905 906 def __boto3_connection_settings(self, s3_resource) -> Boto3ConnectionSettings: 907 endpoint_url_prefix = "https://" if s3_resource["useSSL"] else "http://" 908 return Boto3ConnectionSettings( 909 { 910 "endpoint_url": "{}{}".format( 911 endpoint_url_prefix, s3_resource["endPoint"] 912 ), 913 "region_name": s3_resource["region"], 914 "use_ssl": s3_resource["useSSL"], 915 "aws_access_key_id": s3_resource["accessKey"], 916 "aws_secret_access_key": s3_resource["secretKey"], 917 # no need for path_style here as boto3 is clever enough to determine which one to use 918 } 919 ) 920 921 def whoami(self) -> dict: 922 return self.get("/users/whoami").json() 923 924 @property 925 def user(self) -> dict: 926 return self.whoami() 927 928 @property 929 def state_path(self) -> str: 930 state_path = os.environ.get( 931 "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH") 932 ) 933 if state_path is None: 934 raise Exception("State path not found") 935 return state_path 936 937 @property 938 def state(self) -> Any: 939 return self.get_resource(path=self.state_path, none_if_undefined=True) 940 941 @state.setter 942 def state(self, value: Any) -> None: 943 self.set_state(value) 944 945 @staticmethod 946 def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None: 947 """ 948 Set the state in the shared folder using pickle 949 """ 950 import pickle 951 952 with open(f"/shared/{path}", "wb") as handle: 953 pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL) 954 955 @staticmethod 956 def get_shared_state_pickle(path: str = "state.pickle") -> Any: 957 """ 958 Get the state in the shared folder using pickle 959 """ 960 import pickle 961 962 with open(f"/shared/{path}", "rb") as handle: 963 return pickle.load(handle) 964 965 @staticmethod 966 def set_shared_state(value: Any, path: str = "state.json") -> None: 967 """ 968 Set the state in the shared folder using pickle 969 """ 970 import json 971 972 with open(f"/shared/{path}", "w", encoding="utf-8") as f: 973 json.dump(value, f, ensure_ascii=False, indent=4) 974 975 @staticmethod 976 def get_shared_state(path: str = "state.json") -> None: 977 """ 978 Get the state in the shared folder using pickle 979 """ 980 import json 981 982 with open(f"/shared/{path}", "r", encoding="utf-8") as f: 983 return json.load(f) 984 985 def get_resume_urls(self, approver: str = None) -> dict: 986 nonce = random.randint(0, 1000000000) 987 job_id = os.environ.get("WM_JOB_ID") or "NO_ID" 988 return self.get( 989 f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}", 990 params={"approver": approver}, 991 ).json() 992 993 def request_interactive_slack_approval( 994 self, 995 slack_resource_path: str, 996 channel_id: str, 997 message: str = None, 998 approver: str = None, 999 default_args_json: dict = None, 1000 dynamic_enums_json: dict = None, 1001 ) -> None: 1002 """ 1003 Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields. 1004 1005 **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. 1006 Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form 1007 1008 :param slack_resource_path: The path to the Slack resource in Windmill. 1009 :type slack_resource_path: str 1010 :param channel_id: The Slack channel ID where the approval request will be sent. 1011 :type channel_id: str 1012 :param message: Optional custom message to include in the Slack approval request. 1013 :type message: str, optional 1014 :param approver: Optional user ID or name of the approver for the request. 1015 :type approver: str, optional 1016 :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields. 1017 :type default_args_json: dict, optional 1018 :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields. 1019 :type dynamic_enums_json: dict, optional 1020 1021 :raises Exception: If the function is not called within a flow or flow preview. 1022 :raises Exception: If the required flow job or flow step environment variables are not set. 1023 1024 :return: None 1025 1026 **Usage Example:** 1027 >>> client.request_interactive_slack_approval( 1028 ... slack_resource_path="/u/alex/my_slack_resource", 1029 ... channel_id="admins-slack-channel", 1030 ... message="Please approve this request", 1031 ... approver="approver123", 1032 ... default_args_json={"key1": "value1", "key2": 42}, 1033 ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, 1034 ... ) 1035 1036 **Notes:** 1037 - This function must be executed within a Windmill flow or flow preview. 1038 - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context. 1039 """ 1040 workspace = self.workspace 1041 flow_job_id = os.environ.get("WM_FLOW_JOB_ID") 1042 1043 if not flow_job_id: 1044 raise Exception( 1045 "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview." 1046 ) 1047 1048 # Only include non-empty parameters 1049 params = {} 1050 if message: 1051 params["message"] = message 1052 if approver: 1053 params["approver"] = approver 1054 if slack_resource_path: 1055 params["slack_resource_path"] = slack_resource_path 1056 if channel_id: 1057 params["channel_id"] = channel_id 1058 if os.environ.get("WM_FLOW_STEP_ID"): 1059 params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID") 1060 if default_args_json: 1061 params["default_args_json"] = json.dumps(default_args_json) 1062 if dynamic_enums_json: 1063 params["dynamic_enums_json"] = json.dumps(dynamic_enums_json) 1064 1065 self.get( 1066 f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}", 1067 params=params, 1068 ) 1069 1070 def username_to_email(self, username: str) -> str: 1071 """ 1072 Get email from workspace username 1073 This method is particularly useful for apps that require the email address of the viewer. 1074 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 1075 """ 1076 return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text 1077 1078 def send_teams_message( 1079 self, 1080 conversation_id: str, 1081 text: str, 1082 success: bool = True, 1083 card_block: dict = None, 1084 ): 1085 """ 1086 Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message 1087 """ 1088 return self.post( 1089 f"/teams/activities", 1090 json={ 1091 "conversation_id": conversation_id, 1092 "text": text, 1093 "success": success, 1094 "card_block": card_block, 1095 }, 1096 ) 1097 1098 def datatable(self, name: str = "main"): 1099 return DataTableClient(self, name) 1100 1101 def ducklake(self, name: str = "main"): 1102 return DucklakeClient(self, name)
36 def __init__(self, base_url=None, token=None, workspace=None, verify=True): 37 base = ( 38 base_url 39 or os.environ.get("BASE_INTERNAL_URL") 40 or os.environ.get("WM_BASE_URL") 41 ) 42 43 self.base_url = f"{base}/api" 44 self.token = token or os.environ.get("WM_TOKEN") 45 self.headers = { 46 "Content-Type": "application/json", 47 "Authorization": f"Bearer {self.token}", 48 } 49 self.verify = verify 50 self.client = self.get_client() 51 self.workspace = workspace or os.environ.get("WM_WORKSPACE") 52 self.path = os.environ.get("WM_JOB_PATH") 53 54 self.mocked_api = self.get_mocked_api() 55 56 assert self.workspace, ( 57 f"workspace required as an argument or as WM_WORKSPACE environment variable" 58 )
60 def get_mocked_api(self) -> Optional[dict]: 61 mocked_path = os.environ.get("WM_MOCKED_API_FILE") 62 if not mocked_path: 63 return None 64 logger.info("Using mocked API from %s", mocked_path) 65 mocked_api = {"variables": {}, "resources": {}} 66 try: 67 with open(mocked_path, "r") as f: 68 incoming_mocked_api = json.load(f) 69 mocked_api = {**mocked_api, **incoming_mocked_api} 70 except Exception as e: 71 logger.warning( 72 "Error parsing mocked API file at path %s Using empty mocked API.", 73 mocked_path, 74 ) 75 logger.debug(e) 76 return mocked_api
85 def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 86 endpoint = endpoint.lstrip("/") 87 resp = self.client.get(f"/{endpoint}", **kwargs) 88 if raise_for_status: 89 try: 90 resp.raise_for_status() 91 except httpx.HTTPStatusError as err: 92 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 93 logger.error(error) 94 raise Exception(error) 95 return resp
97 def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response: 98 endpoint = endpoint.lstrip("/") 99 resp = self.client.post(f"/{endpoint}", **kwargs) 100 if raise_for_status: 101 try: 102 resp.raise_for_status() 103 except httpx.HTTPStatusError as err: 104 error = f"{err.request.url}: {err.response.status_code}, {err.response.text}" 105 logger.error(error) 106 raise Exception(error) 107 return resp
117 def run_script_async( 118 self, 119 path: str = None, 120 hash_: str = None, 121 args: dict = None, 122 scheduled_in_secs: int = None, 123 ) -> str: 124 """Create a script job and return its job id. 125 126 .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead. 127 """ 128 logging.warning( 129 "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.", 130 ) 131 assert not (path and hash_), "path and hash_ are mutually exclusive" 132 return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
Create a script job and return its job id.
Deprecated since version Use run_script_by_path_async or run_script_by_hash_async instead..
158 def run_script_by_path_async( 159 self, 160 path: str, 161 args: dict = None, 162 scheduled_in_secs: int = None, 163 ) -> str: 164 """Create a script job by path and return its job id.""" 165 return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs)
Create a script job by path and return its job id.
167 def run_script_by_hash_async( 168 self, 169 hash_: str, 170 args: dict = None, 171 scheduled_in_secs: int = None, 172 ) -> str: 173 """Create a script job by hash and return its job id.""" 174 return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
Create a script job by hash and return its job id.
176 def run_flow_async( 177 self, 178 path: str, 179 args: dict = None, 180 scheduled_in_secs: int = None, 181 # can only be set to false if this the job will be fully await and not concurrent with any other job 182 # as otherwise the child flow and its own child will store their state in the parent job which will 183 # lead to incorrectness and failures 184 do_not_track_in_parent: bool = True, 185 ) -> str: 186 """Create a flow job and return its job id.""" 187 args = args or {} 188 params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {} 189 if not do_not_track_in_parent: 190 if os.environ.get("WM_JOB_ID"): 191 params["parent_job"] = os.environ.get("WM_JOB_ID") 192 if os.environ.get("WM_ROOT_FLOW_JOB_ID"): 193 params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID") 194 if path: 195 endpoint = f"/w/{self.workspace}/jobs/run/f/{path}" 196 else: 197 raise Exception("path must be provided") 198 return self.post(endpoint, json=args, params=params).text
Create a flow job and return its job id.
200 def run_script( 201 self, 202 path: str = None, 203 hash_: str = None, 204 args: dict = None, 205 timeout: dt.timedelta | int | float | None = None, 206 verbose: bool = False, 207 cleanup: bool = True, 208 assert_result_is_not_none: bool = False, 209 ) -> Any: 210 """Run script synchronously and return its result. 211 212 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 213 """ 214 logging.warning( 215 "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.", 216 ) 217 assert not (path and hash_), "path and hash_ are mutually exclusive" 218 return self._run_script_internal( 219 path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose, 220 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 221 )
Run script synchronously and return its result.
Deprecated since version Use run_script_by_path or run_script_by_hash instead..
250 def run_script_by_path( 251 self, 252 path: str, 253 args: dict = None, 254 timeout: dt.timedelta | int | float | None = None, 255 verbose: bool = False, 256 cleanup: bool = True, 257 assert_result_is_not_none: bool = False, 258 ) -> Any: 259 """Run script by path synchronously and return its result.""" 260 return self._run_script_internal( 261 path=path, args=args, timeout=timeout, verbose=verbose, 262 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 263 )
Run script by path synchronously and return its result.
265 def run_script_by_hash( 266 self, 267 hash_: str, 268 args: dict = None, 269 timeout: dt.timedelta | int | float | None = None, 270 verbose: bool = False, 271 cleanup: bool = True, 272 assert_result_is_not_none: bool = False, 273 ) -> Any: 274 """Run script by hash synchronously and return its result.""" 275 return self._run_script_internal( 276 hash_=hash_, args=args, timeout=timeout, verbose=verbose, 277 cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none 278 )
Run script by hash synchronously and return its result.
280 def run_inline_script_preview( 281 self, 282 content: str, 283 language: str, 284 args: dict = None, 285 ) -> Any: 286 """Run a script on the current worker without creating a job""" 287 endpoint = f"/w/{self.workspace}/jobs/run_inline/preview" 288 body = { 289 "content": content, 290 "language": language, 291 "args": args or {}, 292 } 293 return self.post(endpoint, json=body).text
Run a script on the current worker without creating a job
295 def wait_job( 296 self, 297 job_id, 298 timeout: dt.timedelta | int | float | None = None, 299 verbose: bool = False, 300 cleanup: bool = True, 301 assert_result_is_not_none: bool = False, 302 ): 303 def cancel_job(): 304 logger.warning(f"cancelling job: {job_id}") 305 self.post( 306 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 307 json={"reason": "parent script cancelled"}, 308 ).raise_for_status() 309 310 if cleanup: 311 atexit.register(cancel_job) 312 313 start_time = time.time() 314 315 if isinstance(timeout, dt.timedelta): 316 timeout = timeout.total_seconds() 317 318 while True: 319 result_res = self.get( 320 f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True 321 ).json() 322 323 started = result_res["started"] 324 completed = result_res["completed"] 325 success = result_res["success"] 326 327 if not started and verbose: 328 logger.info(f"job {job_id} has not started yet") 329 330 if cleanup and completed: 331 atexit.unregister(cancel_job) 332 333 if completed: 334 result = result_res["result"] 335 if success: 336 if result is None and assert_result_is_not_none: 337 raise Exception("Result was none") 338 return result 339 else: 340 error = result["error"] 341 raise Exception(f"Job {job_id} was not successful: {str(error)}") 342 343 if timeout and ((time.time() - start_time) > timeout): 344 msg = "reached timeout" 345 logger.warning(msg) 346 self.post( 347 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 348 json={"reason": msg}, 349 ) 350 raise TimeoutError(msg) 351 if verbose: 352 logger.info(f"sleeping 0.5 seconds for {job_id = }") 353 354 time.sleep(0.5)
356 def cancel_job(self, job_id: str, reason: str = None) -> str: 357 """Cancel a specific job by ID. 358 359 Args: 360 job_id: UUID of the job to cancel 361 reason: Optional reason for cancellation 362 363 Returns: 364 Response message from the cancel endpoint 365 """ 366 logger.info(f"cancelling job: {job_id}") 367 368 payload = {"reason": reason or "cancelled via cancel_job method"} 369 370 response = self.post( 371 f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}", 372 json=payload, 373 ) 374 375 return response.text
Cancel a specific job by ID.
Args: job_id: UUID of the job to cancel reason: Optional reason for cancellation
Returns: Response message from the cancel endpoint
377 def cancel_running(self) -> dict: 378 """Cancel currently running executions of the same script.""" 379 logger.info("canceling running executions of this script") 380 381 jobs = self.get( 382 f"/w/{self.workspace}/jobs/list", 383 params={ 384 "running": "true", 385 "script_path_exact": self.path, 386 }, 387 ).json() 388 389 current_job_id = os.environ.get("WM_JOB_ID") 390 391 logger.debug(f"{current_job_id = }") 392 393 job_ids = [j["id"] for j in jobs if j["id"] != current_job_id] 394 395 if job_ids: 396 logger.info(f"cancelling the following job ids: {job_ids}") 397 else: 398 logger.info("no previous executions to cancel") 399 400 result = {} 401 402 for id_ in job_ids: 403 result[id_] = self.post( 404 f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}", 405 json={"reason": "killed by `cancel_running` method"}, 406 ) 407 408 return result
Cancel currently running executions of the same script.
423 def get_job_status(self, job_id: str) -> JobStatus: 424 job = self.get_job(job_id) 425 job_type = job.get("type", "") 426 assert job_type, f"{job} is not a valid job" 427 if job_type.lower() == "completedjob": 428 return "COMPLETED" 429 if job.get("running"): 430 return "RUNNING" 431 return "WAITING"
433 def get_result( 434 self, 435 job_id: str, 436 assert_result_is_not_none: bool = True, 437 ) -> Any: 438 result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}") 439 result_text = result.text 440 if assert_result_is_not_none and result_text is None: 441 raise Exception(f"result is None for {job_id = }") 442 try: 443 return result.json() 444 except JSONDecodeError: 445 return result_text
447 def get_variable(self, path: str) -> str: 448 path = parse_variable_syntax(path) or path 449 if self.mocked_api is not None: 450 variables = self.mocked_api["variables"] 451 try: 452 result = variables[path] 453 return result 454 except KeyError: 455 logger.info( 456 f"MockedAPI present, but variable not found at {path}, falling back to real API" 457 ) 458 459 """Get variable from Windmill""" 460 return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json()
462 def set_variable(self, path: str, value: str, is_secret: bool = False) -> None: 463 path = parse_variable_syntax(path) or path 464 if self.mocked_api is not None: 465 self.mocked_api["variables"][path] = value 466 return 467 468 """Set variable from Windmill""" 469 # check if variable exists 470 r = self.get( 471 f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False 472 ) 473 if r.status_code == 404: 474 # create variable 475 self.post( 476 f"/w/{self.workspace}/variables/create", 477 json={ 478 "path": path, 479 "value": value, 480 "is_secret": is_secret, 481 "description": "", 482 }, 483 ) 484 else: 485 # update variable 486 self.post( 487 f"/w/{self.workspace}/variables/update/{path}", 488 json={"value": value}, 489 )
491 def get_resource( 492 self, 493 path: str, 494 none_if_undefined: bool = False, 495 ) -> dict | None: 496 path = parse_resource_syntax(path) or path 497 if self.mocked_api is not None: 498 resources = self.mocked_api["resources"] 499 try: 500 result = resources[path] 501 return result 502 except KeyError: 503 # NOTE: should mocked_api respect `none_if_undefined`? 504 if none_if_undefined: 505 logger.info( 506 f"resource not found at ${path}, but none_if_undefined is True, so returning None" 507 ) 508 return None 509 logger.info( 510 f"MockedAPI present, but resource not found at ${path}, falling back to real API" 511 ) 512 513 """Get resource from Windmill""" 514 try: 515 return self.get( 516 f"/w/{self.workspace}/resources/get_value_interpolated/{path}" 517 ).json() 518 except Exception as e: 519 if none_if_undefined: 520 return None 521 logger.error(e) 522 raise e
524 def set_resource( 525 self, 526 value: Any, 527 path: str, 528 resource_type: str, 529 ): 530 path = parse_resource_syntax(path) or path 531 if self.mocked_api is not None: 532 self.mocked_api["resources"][path] = value 533 return 534 535 # check if resource exists 536 r = self.get( 537 f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False 538 ) 539 if r.status_code == 404: 540 # create resource 541 self.post( 542 f"/w/{self.workspace}/resources/create", 543 json={ 544 "path": path, 545 "value": value, 546 "resource_type": resource_type, 547 }, 548 ) 549 else: 550 # update resource 551 self.post( 552 f"/w/{self.workspace}/resources/update_value/{path}", 553 json={"value": value}, 554 )
556 def list_resources( 557 self, 558 resource_type: str = None, 559 page: int = None, 560 per_page: int = None, 561 ) -> list[dict]: 562 """List resources from Windmill workspace. 563 564 Args: 565 resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") 566 page: Optional page number for pagination 567 per_page: Optional number of results per page 568 569 Returns: 570 List of resource dictionaries 571 """ 572 params = {} 573 if resource_type is not None: 574 params["resource_type"] = resource_type 575 if page is not None: 576 params["page"] = page 577 if per_page is not None: 578 params["per_page"] = per_page 579 580 return self.get( 581 f"/w/{self.workspace}/resources/list", 582 params=params if params else None, 583 ).json()
List resources from Windmill workspace.
Args: resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") page: Optional page number for pagination per_page: Optional number of results per page
Returns: List of resource dictionaries
588 def set_progress(self, value: int, job_id: Optional[str] = None): 589 workspace = get_workspace() 590 flow_id = os.environ.get("WM_FLOW_JOB_ID") 591 job_id = job_id or os.environ.get("WM_JOB_ID") 592 593 if job_id != None: 594 job = self.get_job(job_id) 595 flow_id = job.get("parent_job") 596 597 self.post( 598 f"/w/{workspace}/job_metrics/set_progress/{job_id}", 599 json={ 600 "percent": value, 601 "flow_job_id": flow_id or None, 602 }, 603 )
605 def get_progress(self, job_id: Optional[str] = None) -> Any: 606 workspace = get_workspace() 607 job_id = job_id or os.environ.get("WM_JOB_ID") 608 609 r = self.get( 610 f"/w/{workspace}/job_metrics/get_progress/{job_id}", 611 ) 612 if r.status_code == 404: 613 print(f"Job {job_id} does not exist") 614 return None 615 else: 616 return r.json()
618 def set_flow_user_state(self, key: str, value: Any) -> None: 619 """Set the user state of a flow at a given key""" 620 flow_id = self.get_root_job_id() 621 r = self.post( 622 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 623 json=value, 624 raise_for_status=False, 625 ) 626 if r.status_code == 404: 627 print(f"Job {flow_id} does not exist or is not a flow")
Set the user state of a flow at a given key
629 def get_flow_user_state(self, key: str) -> Any: 630 """Get the user state of a flow at a given key""" 631 flow_id = self.get_root_job_id() 632 r = self.get( 633 f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}", 634 raise_for_status=False, 635 ) 636 if r.status_code == 404: 637 print(f"Job {flow_id} does not exist or is not a flow") 638 return None 639 else: 640 return r.json()
Get the user state of a flow at a given key
646 def get_duckdb_connection_settings( 647 self, 648 s3_resource_path: str = "", 649 ) -> DuckDbConnectionSettings | None: 650 """ 651 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 652 initiate an S3 connection from DuckDB 653 """ 654 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 655 try: 656 raw_obj = self.post( 657 f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings", 658 json={} 659 if s3_resource_path == "" 660 else {"s3_resource_path": s3_resource_path}, 661 ).json() 662 return DuckDbConnectionSettings(raw_obj) 663 except JSONDecodeError as e: 664 raise Exception( 665 "Could not generate DuckDB S3 connection settings from the provided resource" 666 ) from e
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from DuckDB
668 def get_polars_connection_settings( 669 self, 670 s3_resource_path: str = "", 671 ) -> PolarsConnectionSettings: 672 """ 673 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 674 initiate an S3 connection from Polars 675 """ 676 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 677 try: 678 raw_obj = self.post( 679 f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings", 680 json={} 681 if s3_resource_path == "" 682 else {"s3_resource_path": s3_resource_path}, 683 ).json() 684 return PolarsConnectionSettings(raw_obj) 685 except JSONDecodeError as e: 686 raise Exception( 687 "Could not generate Polars S3 connection settings from the provided resource" 688 ) from e
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from Polars
690 def get_boto3_connection_settings( 691 self, 692 s3_resource_path: str = "", 693 ) -> Boto3ConnectionSettings: 694 """ 695 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 696 initiate an S3 connection using boto3 697 """ 698 s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path 699 try: 700 s3_resource = self.post( 701 f"/w/{self.workspace}/job_helpers/v2/s3_resource_info", 702 json={} 703 if s3_resource_path == "" 704 else {"s3_resource_path": s3_resource_path}, 705 ).json() 706 return self.__boto3_connection_settings(s3_resource) 707 except JSONDecodeError as e: 708 raise Exception( 709 "Could not generate Boto3 S3 connection settings from the provided resource" 710 ) from e
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection using boto3
712 def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes: 713 """ 714 Load a file from the workspace s3 bucket and returns its content as bytes. 715 716 '''python 717 from wmill import S3Object 718 719 s3_obj = S3Object(s3="/path/to/my_file.txt") 720 my_obj_content = client.load_s3_file(s3_obj) 721 file_content = my_obj_content.decode("utf-8") 722 ''' 723 """ 724 s3object = parse_s3_object(s3object) 725 with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 726 return file_reader.read()
Load a file from the workspace s3 bucket and returns its content as bytes.
'''python from wmill import S3Object
s3_obj = S3Object(s3="/path/to/my_file.txt") my_obj_content = client.load_s3_file(s3_obj) file_content = my_obj_content.decode("utf-8") '''
728 def load_s3_file_reader( 729 self, s3object: S3Object | str, s3_resource_path: str | None 730 ) -> BufferedReader: 731 """ 732 Load a file from the workspace s3 bucket and returns the bytes stream. 733 734 '''python 735 from wmill import S3Object 736 737 s3_obj = S3Object(s3="/path/to/my_file.txt") 738 with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: 739 print(file_reader.read()) 740 ''' 741 """ 742 s3object = parse_s3_object(s3object) 743 reader = S3BufferedReader( 744 f"{self.workspace}", 745 self.client, 746 s3object["s3"], 747 s3_resource_path, 748 s3object["storage"] if "storage" in s3object else None, 749 ) 750 return reader
Load a file from the workspace s3 bucket and returns the bytes stream.
'''python from wmill import S3Object
s3_obj = S3Object(s3="/path/to/my_file.txt") with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: print(file_reader.read()) '''
752 def write_s3_file( 753 self, 754 s3object: S3Object | str | None, 755 file_content: BufferedReader | bytes, 756 s3_resource_path: str | None, 757 content_type: str | None = None, 758 content_disposition: str | None = None, 759 ) -> S3Object: 760 """ 761 Write a file to the workspace S3 bucket 762 763 '''python 764 from wmill import S3Object 765 766 s3_obj = S3Object(s3="/path/to/my_file.txt") 767 768 # for an in memory bytes array: 769 file_content = b'Hello Windmill!' 770 client.write_s3_file(s3_obj, file_content) 771 772 # for a file: 773 with open("my_file.txt", "rb") as my_file: 774 client.write_s3_file(s3_obj, my_file) 775 ''' 776 """ 777 s3object = parse_s3_object(s3object) 778 # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator 779 if isinstance(file_content, BufferedReader): 780 content_payload = bytes_generator(file_content) 781 elif isinstance(file_content, bytes): 782 content_payload = file_content 783 else: 784 raise Exception("Type of file_content not supported") 785 786 query_params = {} 787 if s3object is not None and s3object["s3"] != "": 788 query_params["file_key"] = s3object["s3"] 789 if s3_resource_path is not None and s3_resource_path != "": 790 query_params["s3_resource_path"] = s3_resource_path 791 if ( 792 s3object is not None 793 and "storage" in s3object 794 and s3object["storage"] is not None 795 ): 796 query_params["storage"] = s3object["storage"] 797 if content_type is not None: 798 query_params["content_type"] = content_type 799 if content_disposition is not None: 800 query_params["content_disposition"] = content_disposition 801 802 try: 803 # need a vanilla client b/c content-type is not application/json here 804 response = httpx.post( 805 f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file", 806 headers={ 807 "Authorization": f"Bearer {self.token}", 808 "Content-Type": "application/octet-stream", 809 }, 810 params=query_params, 811 content=content_payload, 812 verify=self.verify, 813 timeout=None, 814 ).json() 815 except Exception as e: 816 raise Exception("Could not write file to S3") from e 817 return S3Object(s3=response["file_key"])
Write a file to the workspace S3 bucket
'''python from wmill import S3Object
s3_obj = S3Object(s3="/path/to/my_file.txt")
for an in memory bytes array:
file_content = b'Hello Windmill!' client.write_s3_file(s3_obj, file_content)
for a file:
with open("my_file.txt", "rb") as my_file: client.write_s3_file(s3_obj, my_file) '''
830 def get_presigned_s3_public_urls( 831 self, 832 s3_objects: list[S3Object | str], 833 base_url: str | None = None, 834 ) -> list[str]: 835 """ 836 Generate presigned public URLs for an array of S3 objects. 837 If an S3 object is not signed yet, it will be signed first. 838 839 Args: 840 s3_objects: List of S3 objects to sign 841 base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL) 842 843 Returns: 844 List of signed public URLs 845 846 Example: 847 >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] 848 >>> urls = client.get_presigned_s3_public_urls(s3_objs) 849 """ 850 base_url = base_url or self._get_public_base_url() 851 852 s3_objs = [parse_s3_object(s3_obj) for s3_obj in s3_objects] 853 854 # Sign all S3 objects that need to be signed in one go 855 s3_objs_to_sign: list[tuple[S3Object, int]] = [ 856 (s3_obj, index) 857 for index, s3_obj in enumerate(s3_objs) 858 if s3_obj.get("presigned") is None 859 ] 860 861 if s3_objs_to_sign: 862 print(f"Signing {len(s3_objs_to_sign)} S3 objects...") 863 signed_s3_objs = self.sign_s3_objects( 864 [s3_obj for s3_obj, _ in s3_objs_to_sign] 865 ) 866 for i, (_, original_index) in enumerate(s3_objs_to_sign): 867 s3_objs[original_index] = parse_s3_object(signed_s3_objs[i]) 868 869 signed_urls: list[str] = [] 870 for s3_obj in s3_objs: 871 s3 = s3_obj.get("s3", "") 872 presigned = s3_obj.get("presigned", "") 873 storage = s3_obj.get("storage", "_default_") 874 signed_url = f"{base_url}/api/w/{self.workspace}/s3_proxy/{storage}/{s3}?{presigned}" 875 signed_urls.append(signed_url) 876 877 return signed_urls
Generate presigned public URLs for an array of S3 objects. If an S3 object is not signed yet, it will be signed first.
Args: s3_objects: List of S3 objects to sign base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
Returns: List of signed public URLs
Example:
s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] urls = client.get_presigned_s3_public_urls(s3_objs)
879 def get_presigned_s3_public_url( 880 self, 881 s3_object: S3Object | str, 882 base_url: str | None = None, 883 ) -> str: 884 """ 885 Generate a presigned public URL for an S3 object. 886 If the S3 object is not signed yet, it will be signed first. 887 888 Args: 889 s3_object: S3 object to sign 890 base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL) 891 892 Returns: 893 Signed public URL 894 895 Example: 896 >>> s3_obj = S3Object(s3="/path/to/file.txt") 897 >>> url = client.get_presigned_s3_public_url(s3_obj) 898 """ 899 urls = self.get_presigned_s3_public_urls([s3_object], base_url) 900 return urls[0]
Generate a presigned public URL for an S3 object. If the S3 object is not signed yet, it will be signed first.
Args: s3_object: S3 object to sign base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
Returns: Signed public URL
Example:
s3_obj = S3Object(s3="/path/to/file.txt") url = client.get_presigned_s3_public_url(s3_obj)
993 def request_interactive_slack_approval( 994 self, 995 slack_resource_path: str, 996 channel_id: str, 997 message: str = None, 998 approver: str = None, 999 default_args_json: dict = None, 1000 dynamic_enums_json: dict = None, 1001 ) -> None: 1002 """ 1003 Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields. 1004 1005 **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. 1006 Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form 1007 1008 :param slack_resource_path: The path to the Slack resource in Windmill. 1009 :type slack_resource_path: str 1010 :param channel_id: The Slack channel ID where the approval request will be sent. 1011 :type channel_id: str 1012 :param message: Optional custom message to include in the Slack approval request. 1013 :type message: str, optional 1014 :param approver: Optional user ID or name of the approver for the request. 1015 :type approver: str, optional 1016 :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields. 1017 :type default_args_json: dict, optional 1018 :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields. 1019 :type dynamic_enums_json: dict, optional 1020 1021 :raises Exception: If the function is not called within a flow or flow preview. 1022 :raises Exception: If the required flow job or flow step environment variables are not set. 1023 1024 :return: None 1025 1026 **Usage Example:** 1027 >>> client.request_interactive_slack_approval( 1028 ... slack_resource_path="/u/alex/my_slack_resource", 1029 ... channel_id="admins-slack-channel", 1030 ... message="Please approve this request", 1031 ... approver="approver123", 1032 ... default_args_json={"key1": "value1", "key2": 42}, 1033 ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, 1034 ... ) 1035 1036 **Notes:** 1037 - This function must be executed within a Windmill flow or flow preview. 1038 - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context. 1039 """ 1040 workspace = self.workspace 1041 flow_job_id = os.environ.get("WM_FLOW_JOB_ID") 1042 1043 if not flow_job_id: 1044 raise Exception( 1045 "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview." 1046 ) 1047 1048 # Only include non-empty parameters 1049 params = {} 1050 if message: 1051 params["message"] = message 1052 if approver: 1053 params["approver"] = approver 1054 if slack_resource_path: 1055 params["slack_resource_path"] = slack_resource_path 1056 if channel_id: 1057 params["channel_id"] = channel_id 1058 if os.environ.get("WM_FLOW_STEP_ID"): 1059 params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID") 1060 if default_args_json: 1061 params["default_args_json"] = json.dumps(default_args_json) 1062 if dynamic_enums_json: 1063 params["dynamic_enums_json"] = json.dumps(dynamic_enums_json) 1064 1065 self.get( 1066 f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}", 1067 params=params, 1068 )
Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
[Enterprise Edition Only] To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
Parameters
- slack_resource_path: The path to the Slack resource in Windmill.
- channel_id: The Slack channel ID where the approval request will be sent.
- message: Optional custom message to include in the Slack approval request.
- approver: Optional user ID or name of the approver for the request.
- default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
- dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
Raises
- Exception: If the function is not called within a flow or flow preview.
- Exception: If the required flow job or flow step environment variables are not set.
Returns
None
Usage Example:
client.request_interactive_slack_approval( ... slack_resource_path="/u/alex/my_slack_resource", ... channel_id="admins-slack-channel", ... message="Please approve this request", ... approver="approver123", ... default_args_json={"key1": "value1", "key2": 42}, ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, ... )
Notes:
- This function must be executed within a Windmill flow or flow preview.
- The function checks for required environment variables (
WM_FLOW_JOB_ID,WM_FLOW_STEP_ID) to ensure it is run in the appropriate context.
1070 def username_to_email(self, username: str) -> str: 1071 """ 1072 Get email from workspace username 1073 This method is particularly useful for apps that require the email address of the viewer. 1074 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 1075 """ 1076 return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text
Get email from workspace username This method is particularly useful for apps that require the email address of the viewer. Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1078 def send_teams_message( 1079 self, 1080 conversation_id: str, 1081 text: str, 1082 success: bool = True, 1083 card_block: dict = None, 1084 ): 1085 """ 1086 Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message 1087 """ 1088 return self.post( 1089 f"/teams/activities", 1090 json={ 1091 "conversation_id": conversation_id, 1092 "text": text, 1093 "success": success, 1094 "card_block": card_block, 1095 }, 1096 )
Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
1117def deprecate(in_favor_of: str): 1118 def decorator(f): 1119 @functools.wraps(f) 1120 def wrapper(*args, **kwargs): 1121 warnings.warn( 1122 ( 1123 f"The '{f.__name__}' method is deprecated and may be removed in the future. " 1124 f"Consider {in_favor_of}" 1125 ), 1126 DeprecationWarning, 1127 ) 1128 return f(*args, **kwargs) 1129 1130 return wrapper 1131 1132 return decorator
1151@init_global_client 1152def run_script_async( 1153 hash_or_path: str, 1154 args: Dict[str, Any] = None, 1155 scheduled_in_secs: int = None, 1156) -> str: 1157 is_path = "/" in hash_or_path 1158 hash_ = None if is_path else hash_or_path 1159 path = hash_or_path if is_path else None 1160 return _client.run_script_async( 1161 hash_=hash_, 1162 path=path, 1163 args=args, 1164 scheduled_in_secs=scheduled_in_secs, 1165 )
1168@init_global_client 1169def run_flow_async( 1170 path: str, 1171 args: Dict[str, Any] = None, 1172 scheduled_in_secs: int = None, 1173 # can only be set to false if this the job will be fully await and not concurrent with any other job 1174 # as otherwise the child flow and its own child will store their state in the parent job which will 1175 # lead to incorrectness and failures 1176 do_not_track_in_parent: bool = True, 1177) -> str: 1178 return _client.run_flow_async( 1179 path=path, 1180 args=args, 1181 scheduled_in_secs=scheduled_in_secs, 1182 do_not_track_in_parent=do_not_track_in_parent, 1183 )
1186@init_global_client 1187def run_script_sync( 1188 hash: str, 1189 args: Dict[str, Any] = None, 1190 verbose: bool = False, 1191 assert_result_is_not_none: bool = True, 1192 cleanup: bool = True, 1193 timeout: dt.timedelta = None, 1194) -> Any: 1195 return _client.run_script( 1196 hash_=hash, 1197 args=args, 1198 verbose=verbose, 1199 assert_result_is_not_none=assert_result_is_not_none, 1200 cleanup=cleanup, 1201 timeout=timeout, 1202 )
1218@init_global_client 1219def run_script_by_hash_async( 1220 hash_: str, 1221 args: Dict[str, Any] = None, 1222 scheduled_in_secs: Union[None, int] = None, 1223) -> str: 1224 return _client.run_script_by_hash_async( 1225 hash_=hash_, 1226 args=args, 1227 scheduled_in_secs=scheduled_in_secs, 1228 )
1231@init_global_client 1232def run_script_by_path_sync( 1233 path: str, 1234 args: Dict[str, Any] = None, 1235 verbose: bool = False, 1236 assert_result_is_not_none: bool = True, 1237 cleanup: bool = True, 1238 timeout: dt.timedelta = None, 1239) -> Any: 1240 return _client.run_script( 1241 path=path, 1242 args=args, 1243 verbose=verbose, 1244 assert_result_is_not_none=assert_result_is_not_none, 1245 cleanup=cleanup, 1246 timeout=timeout, 1247 )
1250@init_global_client 1251def get_id_token(audience: str) -> str: 1252 """ 1253 Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc. 1254 """ 1255 return _client.get_id_token(audience)
Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc.
1270@init_global_client 1271def duckdb_connection_settings(s3_resource_path: str = "") -> DuckDbConnectionSettings: 1272 """ 1273 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1274 initiate an S3 connection from DuckDB 1275 """ 1276 return _client.get_duckdb_connection_settings(s3_resource_path)
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from DuckDB
1279@init_global_client 1280def polars_connection_settings(s3_resource_path: str = "") -> PolarsConnectionSettings: 1281 """ 1282 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1283 initiate an S3 connection from Polars 1284 """ 1285 return _client.get_polars_connection_settings(s3_resource_path)
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from Polars
1288@init_global_client 1289def boto3_connection_settings(s3_resource_path: str = "") -> Boto3ConnectionSettings: 1290 """ 1291 Convenient helpers that takes an S3 resource as input and returns the settings necessary to 1292 initiate an S3 connection using boto3 1293 """ 1294 return _client.get_boto3_connection_settings(s3_resource_path)
Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection using boto3
1297@init_global_client 1298def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None = None) -> bytes: 1299 """ 1300 Load the entire content of a file stored in S3 as bytes 1301 """ 1302 return _client.load_s3_file( 1303 s3object, s3_resource_path if s3_resource_path != "" else None 1304 )
Load the entire content of a file stored in S3 as bytes
1307@init_global_client 1308def load_s3_file_reader( 1309 s3object: S3Object | str, s3_resource_path: str | None = None 1310) -> BufferedReader: 1311 """ 1312 Load the content of a file stored in S3 1313 """ 1314 return _client.load_s3_file_reader( 1315 s3object, s3_resource_path if s3_resource_path != "" else None 1316 )
Load the content of a file stored in S3
1319@init_global_client 1320def write_s3_file( 1321 s3object: S3Object | str | None, 1322 file_content: BufferedReader | bytes, 1323 s3_resource_path: str | None = None, 1324 content_type: str | None = None, 1325 content_disposition: str | None = None, 1326) -> S3Object: 1327 """ 1328 Upload a file to S3 1329 1330 Content type will be automatically guessed from path extension if left empty 1331 1332 See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition 1333 and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type 1334 1335 """ 1336 return _client.write_s3_file( 1337 s3object, 1338 file_content, 1339 s3_resource_path if s3_resource_path != "" else None, 1340 content_type, 1341 content_disposition, 1342 )
Upload a file to S3
Content type will be automatically guessed from path extension if left empty
See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
1345@init_global_client 1346def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]: 1347 """ 1348 Sign S3 objects to be used by anonymous users in public apps 1349 Returns a list of signed s3 tokens 1350 """ 1351 return _client.sign_s3_objects(s3_objects)
Sign S3 objects to be used by anonymous users in public apps Returns a list of signed s3 tokens
1354@init_global_client 1355def sign_s3_object(s3_object: S3Object| str) -> S3Object: 1356 """ 1357 Sign S3 object to be used by anonymous users in public apps 1358 Returns a signed s3 object 1359 """ 1360 return _client.sign_s3_object(s3_object)
Sign S3 object to be used by anonymous users in public apps Returns a signed s3 object
1363@init_global_client 1364def get_presigned_s3_public_urls( 1365 s3_objects: list[S3Object | str], 1366 base_url: str | None = None, 1367) -> list[str]: 1368 """ 1369 Generate presigned public URLs for an array of S3 objects. 1370 If an S3 object is not signed yet, it will be signed first. 1371 1372 Args: 1373 s3_objects: List of S3 objects to sign 1374 base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL) 1375 1376 Returns: 1377 List of signed public URLs 1378 1379 Example: 1380 >>> import wmill 1381 >>> from wmill import S3Object 1382 >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] 1383 >>> urls = wmill.get_presigned_s3_public_urls(s3_objs) 1384 """ 1385 return _client.get_presigned_s3_public_urls(s3_objects, base_url)
Generate presigned public URLs for an array of S3 objects. If an S3 object is not signed yet, it will be signed first.
Args: s3_objects: List of S3 objects to sign base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
Returns: List of signed public URLs
Example:
import wmill from wmill import S3Object s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] urls = wmill.get_presigned_s3_public_urls(s3_objs)
1388@init_global_client 1389def get_presigned_s3_public_url( 1390 s3_object: S3Object | str, 1391 base_url: str | None = None, 1392) -> str: 1393 """ 1394 Generate a presigned public URL for an S3 object. 1395 If the S3 object is not signed yet, it will be signed first. 1396 1397 Args: 1398 s3_object: S3 object to sign 1399 base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL) 1400 1401 Returns: 1402 Signed public URL 1403 1404 Example: 1405 >>> import wmill 1406 >>> from wmill import S3Object 1407 >>> s3_obj = S3Object(s3="/path/to/file.txt") 1408 >>> url = wmill.get_presigned_s3_public_url(s3_obj) 1409 """ 1410 return _client.get_presigned_s3_public_url(s3_object, base_url)
Generate a presigned public URL for an S3 object. If the S3 object is not signed yet, it will be signed first.
Args: s3_object: S3 object to sign base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
Returns: Signed public URL
Example:
import wmill from wmill import S3Object s3_obj = S3Object(s3="/path/to/file.txt") url = wmill.get_presigned_s3_public_url(s3_obj)
1413@init_global_client 1414def whoami() -> dict: 1415 """ 1416 Returns the current user 1417 """ 1418 return _client.user
Returns the current user
1421@init_global_client 1422@deprecate("Windmill().state") 1423def get_state() -> Any: 1424 """ 1425 Get the state 1426 """ 1427 return _client.state
Get the state
1430@init_global_client 1431def get_resource( 1432 path: str, 1433 none_if_undefined: bool = False, 1434) -> dict | None: 1435 """Get resource from Windmill""" 1436 return _client.get_resource(path, none_if_undefined)
Get resource from Windmill
1439@init_global_client 1440def set_resource(path: str, value: Any, resource_type: str = "any") -> None: 1441 """ 1442 Set the resource at a given path as a string, creating it if it does not exist 1443 """ 1444 return _client.set_resource(value=value, path=path, resource_type=resource_type)
Set the resource at a given path as a string, creating it if it does not exist
1447@init_global_client 1448def list_resources( 1449 resource_type: str = None, 1450 page: int = None, 1451 per_page: int = None, 1452) -> list[dict]: 1453 """List resources from Windmill workspace. 1454 1455 Args: 1456 resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") 1457 page: Optional page number for pagination 1458 per_page: Optional number of results per page 1459 1460 Returns: 1461 List of resource dictionaries 1462 1463 Example: 1464 >>> # Get all resources 1465 >>> all_resources = wmill.list_resources() 1466 1467 >>> # Get only PostgreSQL resources 1468 >>> pg_resources = wmill.list_resources(resource_type="postgresql") 1469 """ 1470 return _client.list_resources( 1471 resource_type=resource_type, 1472 page=page, 1473 per_page=per_page, 1474 )
List resources from Windmill workspace.
Args: resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") page: Optional page number for pagination per_page: Optional number of results per page
Returns: List of resource dictionaries
Example:
Get all resources
all_resources = wmill.list_resources()
>>> # Get only PostgreSQL resources >>> pg_resources = wmill.list_resources(resource_type="postgresql")
1477@init_global_client 1478def set_state(value: Any) -> None: 1479 """ 1480 Set the state 1481 """ 1482 return _client.set_state(value)
Set the state
1485@init_global_client 1486def set_progress(value: int, job_id: Optional[str] = None) -> None: 1487 """ 1488 Set the progress 1489 """ 1490 return _client.set_progress(value, job_id)
Set the progress
1493@init_global_client 1494def get_progress(job_id: Optional[str] = None) -> Any: 1495 """ 1496 Get the progress 1497 """ 1498 return _client.get_progress(job_id)
Get the progress
1530@init_global_client 1531def get_variable(path: str) -> str: 1532 """ 1533 Returns the variable at a given path as a string 1534 """ 1535 return _client.get_variable(path)
Returns the variable at a given path as a string
1538@init_global_client 1539def set_variable(path: str, value: str, is_secret: bool = False) -> None: 1540 """ 1541 Set the variable at a given path as a string, creating it if it does not exist 1542 """ 1543 return _client.set_variable(path, value, is_secret)
Set the variable at a given path as a string, creating it if it does not exist
1546@init_global_client 1547def get_flow_user_state(key: str) -> Any: 1548 """ 1549 Get the user state of a flow at a given key 1550 """ 1551 return _client.get_flow_user_state(key)
Get the user state of a flow at a given key
1554@init_global_client 1555def set_flow_user_state(key: str, value: Any) -> None: 1556 """ 1557 Set the user state of a flow at a given key 1558 """ 1559 return _client.set_flow_user_state(key, value)
Set the user state of a flow at a given key
1572@init_global_client 1573def request_interactive_slack_approval( 1574 slack_resource_path: str, 1575 channel_id: str, 1576 message: str = None, 1577 approver: str = None, 1578 default_args_json: dict = None, 1579 dynamic_enums_json: dict = None, 1580) -> None: 1581 return _client.request_interactive_slack_approval( 1582 slack_resource_path=slack_resource_path, 1583 channel_id=channel_id, 1584 message=message, 1585 approver=approver, 1586 default_args_json=default_args_json, 1587 dynamic_enums_json=dynamic_enums_json, 1588 )
1598@init_global_client 1599def cancel_job(job_id: str, reason: str = None) -> str: 1600 """Cancel a specific job by ID. 1601 1602 Args: 1603 job_id: UUID of the job to cancel 1604 reason: Optional reason for cancellation 1605 1606 Returns: 1607 Response message from the cancel endpoint 1608 """ 1609 return _client.cancel_job(job_id, reason)
Cancel a specific job by ID.
Args: job_id: UUID of the job to cancel reason: Optional reason for cancellation
Returns: Response message from the cancel endpoint
1612@init_global_client 1613def cancel_running() -> dict: 1614 """Cancel currently running executions of the same script.""" 1615 return _client.cancel_running()
Cancel currently running executions of the same script.
1618@init_global_client 1619def run_script( 1620 path: str = None, 1621 hash_: str = None, 1622 args: dict = None, 1623 timeout: dt.timedelta | int | float = None, 1624 verbose: bool = False, 1625 cleanup: bool = True, 1626 assert_result_is_not_none: bool = True, 1627) -> Any: 1628 """Run script synchronously and return its result. 1629 1630 .. deprecated:: Use run_script_by_path or run_script_by_hash instead. 1631 """ 1632 return _client.run_script( 1633 path=path, 1634 hash_=hash_, 1635 args=args, 1636 verbose=verbose, 1637 assert_result_is_not_none=assert_result_is_not_none, 1638 cleanup=cleanup, 1639 timeout=timeout, 1640 )
Run script synchronously and return its result.
Deprecated since version Use run_script_by_path or run_script_by_hash instead..
1643@init_global_client 1644def run_script_by_path( 1645 path: str, 1646 args: dict = None, 1647 timeout: dt.timedelta | int | float = None, 1648 verbose: bool = False, 1649 cleanup: bool = True, 1650 assert_result_is_not_none: bool = True, 1651) -> Any: 1652 """Run script by path synchronously and return its result.""" 1653 return _client.run_script_by_path( 1654 path=path, 1655 args=args, 1656 verbose=verbose, 1657 assert_result_is_not_none=assert_result_is_not_none, 1658 cleanup=cleanup, 1659 timeout=timeout, 1660 )
Run script by path synchronously and return its result.
1663@init_global_client 1664def run_script_by_hash( 1665 hash_: str, 1666 args: dict = None, 1667 timeout: dt.timedelta | int | float = None, 1668 verbose: bool = False, 1669 cleanup: bool = True, 1670 assert_result_is_not_none: bool = True, 1671) -> Any: 1672 """Run script by hash synchronously and return its result.""" 1673 return _client.run_script_by_hash( 1674 hash_=hash_, 1675 args=args, 1676 verbose=verbose, 1677 assert_result_is_not_none=assert_result_is_not_none, 1678 cleanup=cleanup, 1679 timeout=timeout, 1680 )
Run script by hash synchronously and return its result.
1682@init_global_client 1683def run_inline_script_preview( 1684 content: str, 1685 language: str, 1686 args: dict = None, 1687) -> Any: 1688 """Run a script on the current worker without creating a job""" 1689 return _client.run_inline_script_preview( 1690 content=content, 1691 language=language, 1692 args=args, 1693 )
Run a script on the current worker without creating a job
1695@init_global_client 1696def username_to_email(username: str) -> str: 1697 """ 1698 Get email from workspace username 1699 This method is particularly useful for apps that require the email address of the viewer. 1700 Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app. 1701 """ 1702 return _client.username_to_email(username)
Get email from workspace username This method is particularly useful for apps that require the email address of the viewer. Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1713def task(*args, **kwargs): 1714 from inspect import signature 1715 1716 def f(func, tag: str | None = None): 1717 if ( 1718 os.environ.get("WM_JOB_ID") is None 1719 or os.environ.get("MAIN_OVERRIDE") == func.__name__ 1720 ): 1721 1722 def inner(*args, **kwargs): 1723 return func(*args, **kwargs) 1724 1725 return inner 1726 else: 1727 1728 def inner(*args, **kwargs): 1729 global _client 1730 if _client is None: 1731 _client = Windmill() 1732 w_id = os.environ.get("WM_WORKSPACE") 1733 job_id = os.environ.get("WM_JOB_ID") 1734 f_name = func.__name__ 1735 json = kwargs 1736 params = list(signature(func).parameters) 1737 for i, arg in enumerate(args): 1738 if i < len(params): 1739 p = params[i] 1740 key = p 1741 if key not in kwargs: 1742 json[key] = arg 1743 1744 params = {} 1745 if tag is not None: 1746 params["tag"] = tag 1747 w_as_code_response = _client.post( 1748 f"/w/{w_id}/jobs/run/workflow_as_code/{job_id}/{f_name}", 1749 json={"args": json}, 1750 params=params, 1751 ) 1752 job_id = w_as_code_response.text 1753 print(f"Executing task {func.__name__} on job {job_id}") 1754 job_result = _client.wait_job(job_id) 1755 print(f"Task {func.__name__} ({job_id}) completed") 1756 return job_result 1757 1758 return inner 1759 1760 if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): 1761 return f(args[0], None) 1762 else: 1763 return lambda x: f(x, kwargs.get("tag"))
1765def parse_resource_syntax(s: str) -> Optional[str]: 1766 """Parse resource syntax from string.""" 1767 if s is None: 1768 return None 1769 if s.startswith("$res:"): 1770 return s[5:] 1771 if s.startswith("res://"): 1772 return s[6:] 1773 return None
Parse resource syntax from string.
1775def parse_s3_object(s3_object: S3Object | str) -> S3Object: 1776 """Parse S3 object from string or S3Object format.""" 1777 if isinstance(s3_object, str): 1778 match = re.match(r'^s3://([^/]*)/(.*)$', s3_object) 1779 if match: 1780 return S3Object(s3=match.group(2) or "", storage=match.group(1) or None) 1781 return S3Object(s3="") 1782 else: 1783 return s3_object
Parse S3 object from string or S3Object format.
1787def parse_variable_syntax(s: str) -> Optional[str]: 1788 """Parse variable syntax from string.""" 1789 if s.startswith("var://"): 1790 return s[6:] 1791 return None
Parse variable syntax from string.
1794def append_to_result_stream(text: str) -> None: 1795 """Append a text to the result stream. 1796 1797 Args: 1798 text: text to append to the result stream 1799 """ 1800 print("WM_STREAM: {}".format(text.replace(chr(10), '\\n')))
Append a text to the result stream.
Args: text: text to append to the result stream
1802def stream_result(stream) -> None: 1803 """Stream to the result stream. 1804 1805 Args: 1806 stream: stream to stream to the result stream 1807 """ 1808 for text in stream: 1809 append_to_result_stream(text)
Stream to the result stream.
Args: stream: stream to stream to the result stream
1811class DataTableClient: 1812 def __init__(self, client: Windmill, name: str): 1813 self.client = client 1814 self.name = name 1815 def query(self, sql: str, *args): 1816 args_dict = {} 1817 args_def = "" 1818 for i, arg in enumerate(args): 1819 args_dict[f"arg{i+1}"] = arg 1820 args_def += f"-- ${i+1} arg{i+1}\n" 1821 sql = args_def + sql 1822 return SqlQuery( 1823 sql, 1824 lambda sql: self.client.run_inline_script_preview( 1825 content=sql, 1826 language="postgresql", 1827 args={"database": f"datatable://{self.name}", **args_dict}, 1828 ) 1829 )
1815 def query(self, sql: str, *args): 1816 args_dict = {} 1817 args_def = "" 1818 for i, arg in enumerate(args): 1819 args_dict[f"arg{i+1}"] = arg 1820 args_def += f"-- ${i+1} arg{i+1}\n" 1821 sql = args_def + sql 1822 return SqlQuery( 1823 sql, 1824 lambda sql: self.client.run_inline_script_preview( 1825 content=sql, 1826 language="postgresql", 1827 args={"database": f"datatable://{self.name}", **args_dict}, 1828 ) 1829 )
1831class DucklakeClient: 1832 def __init__(self, client: Windmill, name: str): 1833 self.client = client 1834 self.name = name 1835 def query(self, sql: str, **kwargs): 1836 args_dict = {} 1837 args_def = "" 1838 for key, value in kwargs.items(): 1839 args_dict[key] = value 1840 args_def += f"-- ${key} ({infer_sql_type(value)})\n" 1841 attach = f"ATTACH 'ducklake://{self.name}' AS dl;USE dl;\n" 1842 sql = args_def + attach + sql 1843 return SqlQuery( 1844 sql, 1845 lambda sql: self.client.run_inline_script_preview( 1846 content=sql, 1847 language="duckdb", 1848 args=args_dict, 1849 ) 1850 )
1835 def query(self, sql: str, **kwargs): 1836 args_dict = {} 1837 args_def = "" 1838 for key, value in kwargs.items(): 1839 args_dict[key] = value 1840 args_def += f"-- ${key} ({infer_sql_type(value)})\n" 1841 attach = f"ATTACH 'ducklake://{self.name}' AS dl;USE dl;\n" 1842 sql = args_def + attach + sql 1843 return SqlQuery( 1844 sql, 1845 lambda sql: self.client.run_inline_script_preview( 1846 content=sql, 1847 language="duckdb", 1848 args=args_dict, 1849 ) 1850 )
1852class SqlQuery: 1853 def __init__(self, sql: str, fetch_fn): 1854 self.sql = sql 1855 self.fetch_fn = fetch_fn 1856 def fetch(self, result_collection: str | None = None): 1857 sql = self.sql 1858 if result_collection is not None: 1859 sql = f'-- result_collection={result_collection}\n{sql}' 1860 return self.fetch_fn(sql) 1861 def fetch_one(self): 1862 return self.fetch(result_collection="last_statement_first_row")
1864def infer_sql_type(value) -> str: 1865 """ 1866 DuckDB executor requires explicit argument types at declaration 1867 These types exist in both DuckDB and Postgres 1868 Check that the types exist if you plan to extend this function for other SQL engines. 1869 """ 1870 if isinstance(value, bool): 1871 # Check bool before int since bool is a subclass of int in Python 1872 return "BOOLEAN" 1873 elif isinstance(value, int): 1874 return "BIGINT" 1875 elif isinstance(value, float): 1876 return "DOUBLE PRECISION" 1877 elif value is None: 1878 return "TEXT" 1879 elif isinstance(value, str): 1880 return "TEXT" 1881 elif isinstance(value, dict) or isinstance(value, list): 1882 return "JSON" 1883 else: 1884 return "TEXT"
DuckDB executor requires explicit argument types at declaration These types exist in both DuckDB and Postgres Check that the types exist if you plan to extend this function for other SQL engines.