wmill.client

   1from __future__ import annotations
   2
   3import atexit
   4import datetime as dt
   5import functools
   6from io import BufferedReader, BytesIO
   7import logging
   8import os
   9import random
  10import time
  11import warnings
  12import json
  13from json import JSONDecodeError
  14from typing import Dict, Any, Union, Literal, Optional
  15import re
  16
  17import httpx
  18
  19from .s3_reader import S3BufferedReader, bytes_generator
  20from .s3_types import (
  21    Boto3ConnectionSettings,
  22    DuckDbConnectionSettings,
  23    PolarsConnectionSettings,
  24    S3Object,
  25)
  26
  27_client: "Windmill | None" = None
  28
  29logger = logging.getLogger("windmill_client")
  30
  31JobStatus = Literal["RUNNING", "WAITING", "COMPLETED"]
  32
  33
  34class Windmill:
  35    def __init__(self, base_url=None, token=None, workspace=None, verify=True):
  36        base = (
  37            base_url
  38            or os.environ.get("BASE_INTERNAL_URL")
  39            or os.environ.get("WM_BASE_URL")
  40        )
  41
  42        self.base_url = f"{base}/api"
  43        self.token = token or os.environ.get("WM_TOKEN")
  44        self.headers = {
  45            "Content-Type": "application/json",
  46            "Authorization": f"Bearer {self.token}",
  47        }
  48        self.verify = verify
  49        self.client = self.get_client()
  50        self.workspace = workspace or os.environ.get("WM_WORKSPACE")
  51        self.path = os.environ.get("WM_JOB_PATH")
  52
  53        self.mocked_api = self.get_mocked_api()
  54
  55        assert self.workspace, (
  56            f"workspace required as an argument or as WM_WORKSPACE environment variable"
  57        )
  58
  59    def get_mocked_api(self) -> Optional[dict]:
  60        mocked_path = os.environ.get("WM_MOCKED_API_FILE")
  61        if not mocked_path:
  62            return None
  63        logger.info("Using mocked API from %s", mocked_path)
  64        mocked_api = {"variables": {}, "resources": {}}
  65        try:
  66            with open(mocked_path, "r") as f:
  67                incoming_mocked_api = json.load(f)
  68            mocked_api = {**mocked_api, **incoming_mocked_api}
  69        except Exception as e:
  70            logger.warning(
  71                "Error parsing mocked API file at path %s Using empty mocked API.",
  72                mocked_path,
  73            )
  74            logger.debug(e)
  75        return mocked_api
  76
  77    def get_client(self) -> httpx.Client:
  78        return httpx.Client(
  79            base_url=self.base_url,
  80            headers=self.headers,
  81            verify=self.verify,
  82        )
  83
  84    def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
  85        endpoint = endpoint.lstrip("/")
  86        resp = self.client.get(f"/{endpoint}", **kwargs)
  87        if raise_for_status:
  88            try:
  89                resp.raise_for_status()
  90            except httpx.HTTPStatusError as err:
  91                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
  92                logger.error(error)
  93                raise Exception(error)
  94        return resp
  95
  96    def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
  97        endpoint = endpoint.lstrip("/")
  98        resp = self.client.post(f"/{endpoint}", **kwargs)
  99        if raise_for_status:
 100            try:
 101                resp.raise_for_status()
 102            except httpx.HTTPStatusError as err:
 103                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
 104                logger.error(error)
 105                raise Exception(error)
 106        return resp
 107
 108    def create_token(self, duration=dt.timedelta(days=1)) -> str:
 109        endpoint = "/users/tokens/create"
 110        payload = {
 111            "label": f"refresh {time.time()}",
 112            "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"),
 113        }
 114        return self.post(endpoint, json=payload).text
 115
 116    def run_script_async(
 117        self,
 118        path: str = None,
 119        hash_: str = None,
 120        args: dict = None,
 121        scheduled_in_secs: int = None,
 122    ) -> str:
 123        """Create a script job and return its job id.
 124        
 125        .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.
 126        """
 127        logging.warning(
 128            "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.",
 129        )
 130        assert not (path and hash_), "path and hash_ are mutually exclusive"
 131        return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
 132
 133    def _run_script_async_internal(
 134        self,
 135        path: str = None,
 136        hash_: str = None,
 137        args: dict = None,
 138        scheduled_in_secs: int = None,
 139    ) -> str:
 140        """Internal helper for running scripts asynchronously."""
 141        args = args or {}
 142        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
 143        if os.environ.get("WM_JOB_ID"):
 144            params["parent_job"] = os.environ.get("WM_JOB_ID")
 145        if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
 146            params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
 147        
 148        if path:
 149            endpoint = f"/w/{self.workspace}/jobs/run/p/{path}"
 150        elif hash_:
 151            endpoint = f"/w/{self.workspace}/jobs/run/h/{hash_}"
 152        else:
 153            raise Exception("path or hash_ must be provided")
 154        
 155        return self.post(endpoint, json=args, params=params).text
 156
 157    def run_script_by_path_async(
 158        self,
 159        path: str,
 160        args: dict = None,
 161        scheduled_in_secs: int = None,
 162    ) -> str:
 163        """Create a script job by path and return its job id."""
 164        return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs)
 165
 166    def run_script_by_hash_async(
 167        self,
 168        hash_: str,
 169        args: dict = None,
 170        scheduled_in_secs: int = None,
 171    ) -> str:
 172        """Create a script job by hash and return its job id."""
 173        return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
 174
 175    def run_flow_async(
 176        self,
 177        path: str,
 178        args: dict = None,
 179        scheduled_in_secs: int = None,
 180        # can only be set to false if this the job will be fully await and not concurrent with any other job
 181        # as otherwise the child flow and its own child will store their state in the parent job which will
 182        # lead to incorrectness and failures
 183        do_not_track_in_parent: bool = True,
 184    ) -> str:
 185        """Create a flow job and return its job id."""
 186        args = args or {}
 187        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
 188        if not do_not_track_in_parent:
 189            if os.environ.get("WM_JOB_ID"):
 190                params["parent_job"] = os.environ.get("WM_JOB_ID")
 191            if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
 192                params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
 193        if path:
 194            endpoint = f"/w/{self.workspace}/jobs/run/f/{path}"
 195        else:
 196            raise Exception("path must be provided")
 197        return self.post(endpoint, json=args, params=params).text
 198
 199    def run_script(
 200        self,
 201        path: str = None,
 202        hash_: str = None,
 203        args: dict = None,
 204        timeout: dt.timedelta | int | float | None = None,
 205        verbose: bool = False,
 206        cleanup: bool = True,
 207        assert_result_is_not_none: bool = False,
 208    ) -> Any:
 209        """Run script synchronously and return its result.
 210        
 211        .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
 212        """
 213        logging.warning(
 214            "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.",
 215        )
 216        assert not (path and hash_), "path and hash_ are mutually exclusive"
 217        return self._run_script_internal(
 218            path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose,
 219            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 220        )
 221
 222    def _run_script_internal(
 223        self,
 224        path: str = None,
 225        hash_: str = None,
 226        args: dict = None,
 227        timeout: dt.timedelta | int | float | None = None,
 228        verbose: bool = False,
 229        cleanup: bool = True,
 230        assert_result_is_not_none: bool = False,
 231    ) -> Any:
 232        """Internal helper for running scripts synchronously."""
 233        args = args or {}
 234
 235        if verbose:
 236            if path:
 237                logger.info(f"running `{path}` synchronously with {args = }")
 238            elif hash_:
 239                logger.info(f"running script with hash `{hash_}` synchronously with {args = }")
 240
 241        if isinstance(timeout, dt.timedelta):
 242            timeout = timeout.total_seconds()
 243
 244        job_id = self._run_script_async_internal(path=path, hash_=hash_, args=args)
 245        return self.wait_job(
 246            job_id, timeout, verbose, cleanup, assert_result_is_not_none
 247        )
 248
 249    def run_script_by_path(
 250        self,
 251        path: str,
 252        args: dict = None,
 253        timeout: dt.timedelta | int | float | None = None,
 254        verbose: bool = False,
 255        cleanup: bool = True,
 256        assert_result_is_not_none: bool = False,
 257    ) -> Any:
 258        """Run script by path synchronously and return its result."""
 259        return self._run_script_internal(
 260            path=path, args=args, timeout=timeout, verbose=verbose,
 261            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 262        )
 263
 264    def run_script_by_hash(
 265        self,
 266        hash_: str,
 267        args: dict = None,
 268        timeout: dt.timedelta | int | float | None = None,
 269        verbose: bool = False,
 270        cleanup: bool = True,
 271        assert_result_is_not_none: bool = False,
 272    ) -> Any:
 273        """Run script by hash synchronously and return its result."""
 274        return self._run_script_internal(
 275            hash_=hash_, args=args, timeout=timeout, verbose=verbose,
 276            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 277        )
 278
 279    def run_inline_script_preview(
 280        self,
 281        content: str,
 282        language: str,
 283        args: dict = None,
 284    ) -> Any:
 285        """Run a script on the current worker without creating a job"""
 286        endpoint = f"/w/{self.workspace}/jobs/run_inline/preview"
 287        body = {
 288            "content": content,
 289            "language": language,
 290            "args": args or {},
 291        }
 292        return self.post(endpoint, json=body).text
 293
 294    def wait_job(
 295        self,
 296        job_id,
 297        timeout: dt.timedelta | int | float | None = None,
 298        verbose: bool = False,
 299        cleanup: bool = True,
 300        assert_result_is_not_none: bool = False,
 301    ):
 302        def cancel_job():
 303            logger.warning(f"cancelling job: {job_id}")
 304            self.post(
 305                f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
 306                json={"reason": "parent script cancelled"},
 307            ).raise_for_status()
 308
 309        if cleanup:
 310            atexit.register(cancel_job)
 311
 312        start_time = time.time()
 313
 314        if isinstance(timeout, dt.timedelta):
 315            timeout = timeout.total_seconds()
 316
 317        while True:
 318            result_res = self.get(
 319                f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True
 320            ).json()
 321
 322            started = result_res["started"]
 323            completed = result_res["completed"]
 324            success = result_res["success"]
 325
 326            if not started and verbose:
 327                logger.info(f"job {job_id} has not started yet")
 328
 329            if cleanup and completed:
 330                atexit.unregister(cancel_job)
 331
 332            if completed:
 333                result = result_res["result"]
 334                if success:
 335                    if result is None and assert_result_is_not_none:
 336                        raise Exception("Result was none")
 337                    return result
 338                else:
 339                    error = result["error"]
 340                    raise Exception(f"Job {job_id} was not successful: {str(error)}")
 341
 342            if timeout and ((time.time() - start_time) > timeout):
 343                msg = "reached timeout"
 344                logger.warning(msg)
 345                self.post(
 346                    f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
 347                    json={"reason": msg},
 348                )
 349                raise TimeoutError(msg)
 350            if verbose:
 351                logger.info(f"sleeping 0.5 seconds for {job_id = }")
 352
 353            time.sleep(0.5)
 354
 355    def cancel_job(self, job_id: str, reason: str = None) -> str:
 356        """Cancel a specific job by ID.
 357
 358        Args:
 359            job_id: UUID of the job to cancel
 360            reason: Optional reason for cancellation
 361
 362        Returns:
 363            Response message from the cancel endpoint
 364        """
 365        logger.info(f"cancelling job: {job_id}")
 366
 367        payload = {"reason": reason or "cancelled via cancel_job method"}
 368
 369        response = self.post(
 370            f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
 371            json=payload,
 372        )
 373
 374        return response.text
 375
 376    def cancel_running(self) -> dict:
 377        """Cancel currently running executions of the same script."""
 378        logger.info("canceling running executions of this script")
 379
 380        jobs = self.get(
 381            f"/w/{self.workspace}/jobs/list",
 382            params={
 383                "running": "true",
 384                "script_path_exact": self.path,
 385            },
 386        ).json()
 387
 388        current_job_id = os.environ.get("WM_JOB_ID")
 389
 390        logger.debug(f"{current_job_id = }")
 391
 392        job_ids = [j["id"] for j in jobs if j["id"] != current_job_id]
 393
 394        if job_ids:
 395            logger.info(f"cancelling the following job ids: {job_ids}")
 396        else:
 397            logger.info("no previous executions to cancel")
 398
 399        result = {}
 400
 401        for id_ in job_ids:
 402            result[id_] = self.post(
 403                f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}",
 404                json={"reason": "killed by `cancel_running` method"},
 405            )
 406
 407        return result
 408
 409    def get_job(self, job_id: str) -> dict:
 410        return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json()
 411
 412    def get_root_job_id(self, job_id: str | None = None) -> dict:
 413        job_id = job_id or os.environ.get("WM_JOB_ID")
 414        return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json()
 415
 416    def get_id_token(self, audience: str, expires_in: int | None = None) -> str:
 417        params = {}
 418        if expires_in is not None:
 419            params["expires_in"] = expires_in
 420        return self.post(f"/w/{self.workspace}/oidc/token/{audience}", params=params).text
 421
 422    def get_job_status(self, job_id: str) -> JobStatus:
 423        job = self.get_job(job_id)
 424        job_type = job.get("type", "")
 425        assert job_type, f"{job} is not a valid job"
 426        if job_type.lower() == "completedjob":
 427            return "COMPLETED"
 428        if job.get("running"):
 429            return "RUNNING"
 430        return "WAITING"
 431
 432    def get_result(
 433        self,
 434        job_id: str,
 435        assert_result_is_not_none: bool = True,
 436    ) -> Any:
 437        result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}")
 438        result_text = result.text
 439        if assert_result_is_not_none and result_text is None:
 440            raise Exception(f"result is None for {job_id = }")
 441        try:
 442            return result.json()
 443        except JSONDecodeError:
 444            return result_text
 445
 446    def get_variable(self, path: str) -> str:
 447        path = parse_variable_syntax(path) or path
 448        if self.mocked_api is not None:
 449            variables = self.mocked_api["variables"]
 450            try:
 451                result = variables[path]
 452                return result
 453            except KeyError:
 454                logger.info(
 455                    f"MockedAPI present, but variable not found at {path}, falling back to real API"
 456                )
 457
 458        """Get variable from Windmill"""
 459        return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json()
 460
 461    def set_variable(self, path: str, value: str, is_secret: bool = False) -> None:
 462        path = parse_variable_syntax(path) or path
 463        if self.mocked_api is not None:
 464            self.mocked_api["variables"][path] = value
 465            return
 466
 467        """Set variable from Windmill"""
 468        # check if variable exists
 469        r = self.get(
 470            f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False
 471        )
 472        if r.status_code == 404:
 473            # create variable
 474            self.post(
 475                f"/w/{self.workspace}/variables/create",
 476                json={
 477                    "path": path,
 478                    "value": value,
 479                    "is_secret": is_secret,
 480                    "description": "",
 481                },
 482            )
 483        else:
 484            # update variable
 485            self.post(
 486                f"/w/{self.workspace}/variables/update/{path}",
 487                json={"value": value},
 488            )
 489
 490    def get_resource(
 491        self,
 492        path: str,
 493        none_if_undefined: bool = False,
 494    ) -> dict | None:
 495        path = parse_resource_syntax(path) or path
 496        if self.mocked_api is not None:
 497            resources = self.mocked_api["resources"]
 498            try:
 499                result = resources[path]
 500                return result
 501            except KeyError:
 502                # NOTE: should mocked_api respect `none_if_undefined`?
 503                if none_if_undefined:
 504                    logger.info(
 505                        f"resource not found at ${path}, but none_if_undefined is True, so returning None"
 506                    )
 507                    return None
 508                logger.info(
 509                    f"MockedAPI present, but resource not found at ${path}, falling back to real API"
 510                )
 511
 512        """Get resource from Windmill"""
 513        try:
 514            return self.get(
 515                f"/w/{self.workspace}/resources/get_value_interpolated/{path}"
 516            ).json()
 517        except Exception as e:
 518            if none_if_undefined:
 519                return None
 520            logger.error(e)
 521            raise e
 522
 523    def set_resource(
 524        self,
 525        value: Any,
 526        path: str,
 527        resource_type: str,
 528    ):
 529        path = parse_resource_syntax(path) or path
 530        if self.mocked_api is not None:
 531            self.mocked_api["resources"][path] = value
 532            return
 533
 534        # check if resource exists
 535        r = self.get(
 536            f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False
 537        )
 538        if r.status_code == 404:
 539            # create resource
 540            self.post(
 541                f"/w/{self.workspace}/resources/create",
 542                json={
 543                    "path": path,
 544                    "value": value,
 545                    "resource_type": resource_type,
 546                },
 547            )
 548        else:
 549            # update resource
 550            self.post(
 551                f"/w/{self.workspace}/resources/update_value/{path}",
 552                json={"value": value},
 553            )
 554
 555    def list_resources(
 556        self,
 557        resource_type: str = None,
 558        page: int = None,
 559        per_page: int = None,
 560    ) -> list[dict]:
 561        """List resources from Windmill workspace.
 562        
 563        Args:
 564            resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3")
 565            page: Optional page number for pagination
 566            per_page: Optional number of results per page
 567            
 568        Returns:
 569            List of resource dictionaries
 570        """
 571        params = {}
 572        if resource_type is not None:
 573            params["resource_type"] = resource_type
 574        if page is not None:
 575            params["page"] = page
 576        if per_page is not None:
 577            params["per_page"] = per_page
 578            
 579        return self.get(
 580            f"/w/{self.workspace}/resources/list",
 581            params=params if params else None,
 582        ).json()
 583    
 584    def set_state(self, value: Any):
 585        self.set_resource(value, path=self.state_path, resource_type="state")
 586
 587    def set_progress(self, value: int, job_id: Optional[str] = None):
 588        workspace = get_workspace()
 589        flow_id = os.environ.get("WM_FLOW_JOB_ID")
 590        job_id = job_id or os.environ.get("WM_JOB_ID")
 591
 592        if job_id != None:
 593            job = self.get_job(job_id)
 594            flow_id = job.get("parent_job")
 595
 596        self.post(
 597            f"/w/{workspace}/job_metrics/set_progress/{job_id}",
 598            json={
 599                "percent": value,
 600                "flow_job_id": flow_id or None,
 601            },
 602        )
 603
 604    def get_progress(self, job_id: Optional[str] = None) -> Any:
 605        workspace = get_workspace()
 606        job_id = job_id or os.environ.get("WM_JOB_ID")
 607
 608        r = self.get(
 609            f"/w/{workspace}/job_metrics/get_progress/{job_id}",
 610        )
 611        if r.status_code == 404:
 612            print(f"Job {job_id} does not exist")
 613            return None
 614        else:
 615            return r.json()
 616
 617    def set_flow_user_state(self, key: str, value: Any) -> None:
 618        """Set the user state of a flow at a given key"""
 619        flow_id = self.get_root_job_id()
 620        r = self.post(
 621            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
 622            json=value,
 623            raise_for_status=False,
 624        )
 625        if r.status_code == 404:
 626            print(f"Job {flow_id} does not exist or is not a flow")
 627
 628    def get_flow_user_state(self, key: str) -> Any:
 629        """Get the user state of a flow at a given key"""
 630        flow_id = self.get_root_job_id()
 631        r = self.get(
 632            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
 633            raise_for_status=False,
 634        )
 635        if r.status_code == 404:
 636            print(f"Job {flow_id} does not exist or is not a flow")
 637            return None
 638        else:
 639            return r.json()
 640
 641    @property
 642    def version(self):
 643        return self.get("version").text
 644
 645    def get_duckdb_connection_settings(
 646        self,
 647        s3_resource_path: str = "",
 648    ) -> DuckDbConnectionSettings | None:
 649        """
 650        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 651        initiate an S3 connection from DuckDB
 652        """
 653        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 654        try:
 655            raw_obj = self.post(
 656                f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings",
 657                json={}
 658                if s3_resource_path == ""
 659                else {"s3_resource_path": s3_resource_path},
 660            ).json()
 661            return DuckDbConnectionSettings(raw_obj)
 662        except JSONDecodeError as e:
 663            raise Exception(
 664                "Could not generate DuckDB S3 connection settings from the provided resource"
 665            ) from e
 666
 667    def get_polars_connection_settings(
 668        self,
 669        s3_resource_path: str = "",
 670    ) -> PolarsConnectionSettings:
 671        """
 672        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 673        initiate an S3 connection from Polars
 674        """
 675        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 676        try:
 677            raw_obj = self.post(
 678                f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings",
 679                json={}
 680                if s3_resource_path == ""
 681                else {"s3_resource_path": s3_resource_path},
 682            ).json()
 683            return PolarsConnectionSettings(raw_obj)
 684        except JSONDecodeError as e:
 685            raise Exception(
 686                "Could not generate Polars S3 connection settings from the provided resource"
 687            ) from e
 688
 689    def get_boto3_connection_settings(
 690        self,
 691        s3_resource_path: str = "",
 692    ) -> Boto3ConnectionSettings:
 693        """
 694        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 695        initiate an S3 connection using boto3
 696        """
 697        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 698        try:
 699            s3_resource = self.post(
 700                f"/w/{self.workspace}/job_helpers/v2/s3_resource_info",
 701                json={}
 702                if s3_resource_path == ""
 703                else {"s3_resource_path": s3_resource_path},
 704            ).json()
 705            return self.__boto3_connection_settings(s3_resource)
 706        except JSONDecodeError as e:
 707            raise Exception(
 708                "Could not generate Boto3 S3 connection settings from the provided resource"
 709            ) from e
 710
 711    def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes:
 712        """
 713        Load a file from the workspace s3 bucket and returns its content as bytes.
 714
 715        '''python
 716        from wmill import S3Object
 717
 718        s3_obj = S3Object(s3="/path/to/my_file.txt")
 719        my_obj_content = client.load_s3_file(s3_obj)
 720        file_content = my_obj_content.decode("utf-8")
 721        '''
 722        """
 723        s3object = parse_s3_object(s3object)
 724        with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
 725            return file_reader.read()
 726
 727    def load_s3_file_reader(
 728        self, s3object: S3Object | str, s3_resource_path: str | None
 729    ) -> BufferedReader:
 730        """
 731        Load a file from the workspace s3 bucket and returns the bytes stream.
 732
 733        '''python
 734        from wmill import S3Object
 735
 736        s3_obj = S3Object(s3="/path/to/my_file.txt")
 737        with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
 738            print(file_reader.read())
 739        '''
 740        """
 741        s3object = parse_s3_object(s3object)
 742        reader = S3BufferedReader(
 743            f"{self.workspace}",
 744            self.client,
 745            s3object["s3"],
 746            s3_resource_path,
 747            s3object["storage"] if "storage" in s3object else None,
 748        )
 749        return reader
 750
 751    def write_s3_file(
 752        self,
 753        s3object: S3Object | str | None,
 754        file_content: BufferedReader | bytes,
 755        s3_resource_path: str | None,
 756        content_type: str | None = None,
 757        content_disposition: str | None = None,
 758    ) -> S3Object:
 759        """
 760        Write a file to the workspace S3 bucket
 761
 762        '''python
 763        from wmill import S3Object
 764
 765        s3_obj = S3Object(s3="/path/to/my_file.txt")
 766
 767        # for an in memory bytes array:
 768        file_content = b'Hello Windmill!'
 769        client.write_s3_file(s3_obj, file_content)
 770
 771        # for a file:
 772        with open("my_file.txt", "rb") as my_file:
 773            client.write_s3_file(s3_obj, my_file)
 774        '''
 775        """
 776        s3object = parse_s3_object(s3object)
 777        # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator
 778        if isinstance(file_content, BufferedReader):
 779            content_payload = bytes_generator(file_content)
 780        elif isinstance(file_content, bytes):
 781            content_payload = file_content
 782        else:
 783            raise Exception("Type of file_content not supported")
 784
 785        query_params = {}
 786        if s3object is not None and s3object["s3"] != "":
 787            query_params["file_key"] = s3object["s3"]
 788        if s3_resource_path is not None and s3_resource_path != "":
 789            query_params["s3_resource_path"] = s3_resource_path
 790        if (
 791            s3object is not None
 792            and "storage" in s3object
 793            and s3object["storage"] is not None
 794        ):
 795            query_params["storage"] = s3object["storage"]
 796        if content_type is not None:
 797            query_params["content_type"] = content_type
 798        if content_disposition is not None:
 799            query_params["content_disposition"] = content_disposition
 800
 801        try:
 802            # need a vanilla client b/c content-type is not application/json here
 803            response = httpx.post(
 804                f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file",
 805                headers={
 806                    "Authorization": f"Bearer {self.token}",
 807                    "Content-Type": "application/octet-stream",
 808                },
 809                params=query_params,
 810                content=content_payload,
 811                verify=self.verify,
 812                timeout=None,
 813            ).json()
 814        except Exception as e:
 815            raise Exception("Could not write file to S3") from e
 816        return S3Object(s3=response["file_key"])
 817
 818    def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]:
 819        return self.post(
 820            f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))}
 821        ).json()
 822
 823    def sign_s3_object(self, s3_object: S3Object | str) -> S3Object:
 824        return self.post(
 825            f"/w/{self.workspace}/apps/sign_s3_objects",
 826            json={"s3_objects": [s3_object]},
 827        ).json()[0]
 828
 829    def get_presigned_s3_public_urls(
 830        self,
 831        s3_objects: list[S3Object | str],
 832        base_url: str | None = None,
 833    ) -> list[str]:
 834        """
 835        Generate presigned public URLs for an array of S3 objects.
 836        If an S3 object is not signed yet, it will be signed first.
 837
 838        Args:
 839            s3_objects: List of S3 objects to sign
 840            base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
 841
 842        Returns:
 843            List of signed public URLs
 844
 845        Example:
 846            >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")]
 847            >>> urls = client.get_presigned_s3_public_urls(s3_objs)
 848        """
 849        base_url = base_url or self._get_public_base_url()
 850
 851        s3_objs = [parse_s3_object(s3_obj) for s3_obj in s3_objects]
 852
 853        # Sign all S3 objects that need to be signed in one go
 854        s3_objs_to_sign: list[tuple[S3Object, int]] = [
 855            (s3_obj, index)
 856            for index, s3_obj in enumerate(s3_objs)
 857            if s3_obj.get("presigned") is None
 858        ]
 859
 860        if s3_objs_to_sign:
 861            print(f"Signing {len(s3_objs_to_sign)} S3 objects...")
 862            signed_s3_objs = self.sign_s3_objects(
 863                [s3_obj for s3_obj, _ in s3_objs_to_sign]
 864            )
 865            for i, (_, original_index) in enumerate(s3_objs_to_sign):
 866                s3_objs[original_index] = parse_s3_object(signed_s3_objs[i])
 867
 868        signed_urls: list[str] = []
 869        for s3_obj in s3_objs:
 870            s3 = s3_obj.get("s3", "")
 871            presigned = s3_obj.get("presigned", "")
 872            storage = s3_obj.get("storage", "_default_")
 873            signed_url = f"{base_url}/api/w/{self.workspace}/s3_proxy/{storage}/{s3}?{presigned}"
 874            signed_urls.append(signed_url)
 875
 876        return signed_urls
 877
 878    def get_presigned_s3_public_url(
 879        self,
 880        s3_object: S3Object | str,
 881        base_url: str | None = None,
 882    ) -> str:
 883        """
 884        Generate a presigned public URL for an S3 object.
 885        If the S3 object is not signed yet, it will be signed first.
 886
 887        Args:
 888            s3_object: S3 object to sign
 889            base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
 890
 891        Returns:
 892            Signed public URL
 893
 894        Example:
 895            >>> s3_obj = S3Object(s3="/path/to/file.txt")
 896            >>> url = client.get_presigned_s3_public_url(s3_obj)
 897        """
 898        urls = self.get_presigned_s3_public_urls([s3_object], base_url)
 899        return urls[0]
 900
 901    def _get_public_base_url(self) -> str:
 902        """Get the public base URL from environment or default to localhost"""
 903        return os.environ.get("WM_BASE_URL", "http://localhost:3000")
 904
 905    def __boto3_connection_settings(self, s3_resource) -> Boto3ConnectionSettings:
 906        endpoint_url_prefix = "https://" if s3_resource["useSSL"] else "http://"
 907        return Boto3ConnectionSettings(
 908            {
 909                "endpoint_url": "{}{}".format(
 910                    endpoint_url_prefix, s3_resource["endPoint"]
 911                ),
 912                "region_name": s3_resource["region"],
 913                "use_ssl": s3_resource["useSSL"],
 914                "aws_access_key_id": s3_resource["accessKey"],
 915                "aws_secret_access_key": s3_resource["secretKey"],
 916                # no need for path_style here as boto3 is clever enough to determine which one to use
 917            }
 918        )
 919
 920    def whoami(self) -> dict:
 921        return self.get("/users/whoami").json()
 922
 923    @property
 924    def user(self) -> dict:
 925        return self.whoami()
 926
 927    @property
 928    def state_path(self) -> str:
 929        state_path = os.environ.get(
 930            "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH")
 931        )
 932        if state_path is None:
 933            raise Exception("State path not found")
 934        return state_path
 935
 936    @property
 937    def state(self) -> Any:
 938        return self.get_resource(path=self.state_path, none_if_undefined=True)
 939
 940    @state.setter
 941    def state(self, value: Any) -> None:
 942        self.set_state(value)
 943
 944    @staticmethod
 945    def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None:
 946        """
 947        Set the state in the shared folder using pickle
 948        """
 949        import pickle
 950
 951        with open(f"/shared/{path}", "wb") as handle:
 952            pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL)
 953
 954    @staticmethod
 955    def get_shared_state_pickle(path: str = "state.pickle") -> Any:
 956        """
 957        Get the state in the shared folder using pickle
 958        """
 959        import pickle
 960
 961        with open(f"/shared/{path}", "rb") as handle:
 962            return pickle.load(handle)
 963
 964    @staticmethod
 965    def set_shared_state(value: Any, path: str = "state.json") -> None:
 966        """
 967        Set the state in the shared folder using pickle
 968        """
 969        import json
 970
 971        with open(f"/shared/{path}", "w", encoding="utf-8") as f:
 972            json.dump(value, f, ensure_ascii=False, indent=4)
 973
 974    @staticmethod
 975    def get_shared_state(path: str = "state.json") -> None:
 976        """
 977        Get the state in the shared folder using pickle
 978        """
 979        import json
 980
 981        with open(f"/shared/{path}", "r", encoding="utf-8") as f:
 982            return json.load(f)
 983
 984    def get_resume_urls(self, approver: str = None) -> dict:
 985        nonce = random.randint(0, 1000000000)
 986        job_id = os.environ.get("WM_JOB_ID") or "NO_ID"
 987        return self.get(
 988            f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}",
 989            params={"approver": approver},
 990        ).json()
 991
 992    def request_interactive_slack_approval(
 993        self,
 994        slack_resource_path: str,
 995        channel_id: str,
 996        message: str = None,
 997        approver: str = None,
 998        default_args_json: dict = None,
 999        dynamic_enums_json: dict = None,
1000    ) -> None:
1001        """
1002        Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
1003
1004        **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.
1005        Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
1006
1007        :param slack_resource_path: The path to the Slack resource in Windmill.
1008        :type slack_resource_path: str
1009        :param channel_id: The Slack channel ID where the approval request will be sent.
1010        :type channel_id: str
1011        :param message: Optional custom message to include in the Slack approval request.
1012        :type message: str, optional
1013        :param approver: Optional user ID or name of the approver for the request.
1014        :type approver: str, optional
1015        :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
1016        :type default_args_json: dict, optional
1017        :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
1018        :type dynamic_enums_json: dict, optional
1019
1020        :raises Exception: If the function is not called within a flow or flow preview.
1021        :raises Exception: If the required flow job or flow step environment variables are not set.
1022
1023        :return: None
1024
1025        **Usage Example:**
1026            >>> client.request_interactive_slack_approval(
1027            ...     slack_resource_path="/u/alex/my_slack_resource",
1028            ...     channel_id="admins-slack-channel",
1029            ...     message="Please approve this request",
1030            ...     approver="approver123",
1031            ...     default_args_json={"key1": "value1", "key2": 42},
1032            ...     dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},
1033            ... )
1034
1035        **Notes:**
1036        - This function must be executed within a Windmill flow or flow preview.
1037        - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context.
1038        """
1039        workspace = self.workspace
1040        flow_job_id = os.environ.get("WM_FLOW_JOB_ID")
1041
1042        if not flow_job_id:
1043            raise Exception(
1044                "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview."
1045            )
1046
1047        # Only include non-empty parameters
1048        params = {}
1049        if message:
1050            params["message"] = message
1051        if approver:
1052            params["approver"] = approver
1053        if slack_resource_path:
1054            params["slack_resource_path"] = slack_resource_path
1055        if channel_id:
1056            params["channel_id"] = channel_id
1057        if os.environ.get("WM_FLOW_STEP_ID"):
1058            params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID")
1059        if default_args_json:
1060            params["default_args_json"] = json.dumps(default_args_json)
1061        if dynamic_enums_json:
1062            params["dynamic_enums_json"] = json.dumps(dynamic_enums_json)
1063
1064        self.get(
1065            f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}",
1066            params=params,
1067        )
1068
1069    def username_to_email(self, username: str) -> str:
1070        """
1071        Get email from workspace username
1072        This method is particularly useful for apps that require the email address of the viewer.
1073        Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1074        """
1075        return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text
1076
1077    def send_teams_message(
1078        self,
1079        conversation_id: str,
1080        text: str,
1081        success: bool = True,
1082        card_block: dict = None,
1083    ):
1084        """
1085        Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
1086        """
1087        return self.post(
1088            f"/teams/activities",
1089            json={
1090                "conversation_id": conversation_id,
1091                "text": text,
1092                "success": success,
1093                "card_block": card_block,
1094            },
1095        )
1096
1097    def datatable(self, name: str = "main"):
1098        return DataTableClient(self, name)
1099
1100    def ducklake(self, name: str = "main"):
1101        return DucklakeClient(self, name)
1102
1103
1104
1105def init_global_client(f):
1106    @functools.wraps(f)
1107    def wrapper(*args, **kwargs):
1108        global _client
1109        if _client is None:
1110            _client = Windmill()
1111        return f(*args, **kwargs)
1112
1113    return wrapper
1114
1115
1116def deprecate(in_favor_of: str):
1117    def decorator(f):
1118        @functools.wraps(f)
1119        def wrapper(*args, **kwargs):
1120            warnings.warn(
1121                (
1122                    f"The '{f.__name__}' method is deprecated and may be removed in the future. "
1123                    f"Consider {in_favor_of}"
1124                ),
1125                DeprecationWarning,
1126            )
1127            return f(*args, **kwargs)
1128
1129        return wrapper
1130
1131    return decorator
1132
1133
1134@init_global_client
1135def get_workspace() -> str:
1136    return _client.workspace
1137
1138
1139@init_global_client
1140def get_root_job_id(job_id: str | None = None) -> str:
1141    return _client.get_root_job_id(job_id)
1142
1143
1144@init_global_client
1145@deprecate("Windmill().version")
1146def get_version() -> str:
1147    return _client.version
1148
1149
1150@init_global_client
1151def run_script_async(
1152    hash_or_path: str,
1153    args: Dict[str, Any] = None,
1154    scheduled_in_secs: int = None,
1155) -> str:
1156    is_path = "/" in hash_or_path
1157    hash_ = None if is_path else hash_or_path
1158    path = hash_or_path if is_path else None
1159    return _client.run_script_async(
1160        hash_=hash_,
1161        path=path,
1162        args=args,
1163        scheduled_in_secs=scheduled_in_secs,
1164    )
1165
1166
1167@init_global_client
1168def run_flow_async(
1169    path: str,
1170    args: Dict[str, Any] = None,
1171    scheduled_in_secs: int = None,
1172    # can only be set to false if this the job will be fully await and not concurrent with any other job
1173    # as otherwise the child flow and its own child will store their state in the parent job which will
1174    # lead to incorrectness and failures
1175    do_not_track_in_parent: bool = True,
1176) -> str:
1177    return _client.run_flow_async(
1178        path=path,
1179        args=args,
1180        scheduled_in_secs=scheduled_in_secs,
1181        do_not_track_in_parent=do_not_track_in_parent,
1182    )
1183
1184
1185@init_global_client
1186def run_script_sync(
1187    hash: str,
1188    args: Dict[str, Any] = None,
1189    verbose: bool = False,
1190    assert_result_is_not_none: bool = True,
1191    cleanup: bool = True,
1192    timeout: dt.timedelta = None,
1193) -> Any:
1194    return _client.run_script(
1195        hash_=hash,
1196        args=args,
1197        verbose=verbose,
1198        assert_result_is_not_none=assert_result_is_not_none,
1199        cleanup=cleanup,
1200        timeout=timeout,
1201    )
1202
1203
1204@init_global_client
1205def run_script_by_path_async(
1206    path: str,
1207    args: Dict[str, Any] = None,
1208    scheduled_in_secs: Union[None, int] = None,
1209) -> str:
1210    return _client.run_script_by_path_async(
1211        path=path,
1212        args=args,
1213        scheduled_in_secs=scheduled_in_secs,
1214    )
1215
1216
1217@init_global_client
1218def run_script_by_hash_async(
1219    hash_: str,
1220    args: Dict[str, Any] = None,
1221    scheduled_in_secs: Union[None, int] = None,
1222) -> str:
1223    return _client.run_script_by_hash_async(
1224        hash_=hash_,
1225        args=args,
1226        scheduled_in_secs=scheduled_in_secs,
1227    )
1228
1229
1230@init_global_client
1231def run_script_by_path_sync(
1232    path: str,
1233    args: Dict[str, Any] = None,
1234    verbose: bool = False,
1235    assert_result_is_not_none: bool = True,
1236    cleanup: bool = True,
1237    timeout: dt.timedelta = None,
1238) -> Any:
1239    return _client.run_script(
1240        path=path,
1241        args=args,
1242        verbose=verbose,
1243        assert_result_is_not_none=assert_result_is_not_none,
1244        cleanup=cleanup,
1245        timeout=timeout,
1246    )
1247
1248
1249@init_global_client
1250def get_id_token(audience: str) -> str:
1251    """
1252    Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc.
1253    """
1254    return _client.get_id_token(audience)
1255
1256
1257@init_global_client
1258def get_job_status(job_id: str) -> JobStatus:
1259    return _client.get_job_status(job_id)
1260
1261
1262@init_global_client
1263def get_result(job_id: str, assert_result_is_not_none=True) -> Dict[str, Any]:
1264    return _client.get_result(
1265        job_id=job_id, assert_result_is_not_none=assert_result_is_not_none
1266    )
1267
1268
1269@init_global_client
1270def duckdb_connection_settings(s3_resource_path: str = "") -> DuckDbConnectionSettings:
1271    """
1272    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1273    initiate an S3 connection from DuckDB
1274    """
1275    return _client.get_duckdb_connection_settings(s3_resource_path)
1276
1277
1278@init_global_client
1279def polars_connection_settings(s3_resource_path: str = "") -> PolarsConnectionSettings:
1280    """
1281    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1282    initiate an S3 connection from Polars
1283    """
1284    return _client.get_polars_connection_settings(s3_resource_path)
1285
1286
1287@init_global_client
1288def boto3_connection_settings(s3_resource_path: str = "") -> Boto3ConnectionSettings:
1289    """
1290    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1291    initiate an S3 connection using boto3
1292    """
1293    return _client.get_boto3_connection_settings(s3_resource_path)
1294
1295
1296@init_global_client
1297def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None = None) -> bytes:
1298    """
1299    Load the entire content of a file stored in S3 as bytes
1300    """
1301    return _client.load_s3_file(
1302        s3object, s3_resource_path if s3_resource_path != "" else None
1303    )
1304
1305
1306@init_global_client
1307def load_s3_file_reader(
1308    s3object: S3Object | str, s3_resource_path: str | None = None
1309) -> BufferedReader:
1310    """
1311    Load the content of a file stored in S3
1312    """
1313    return _client.load_s3_file_reader(
1314        s3object, s3_resource_path if s3_resource_path != "" else None
1315    )
1316
1317
1318@init_global_client
1319def write_s3_file(
1320    s3object: S3Object | str | None,
1321    file_content: BufferedReader | bytes,
1322    s3_resource_path: str | None = None,
1323    content_type: str | None = None,
1324    content_disposition: str | None = None,
1325) -> S3Object:
1326    """
1327    Upload a file to S3
1328
1329    Content type will be automatically guessed from path extension if left empty
1330
1331    See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition
1332    and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
1333
1334    """
1335    return _client.write_s3_file(
1336        s3object,
1337        file_content,
1338        s3_resource_path if s3_resource_path != "" else None,
1339        content_type,
1340        content_disposition,
1341    )
1342
1343
1344@init_global_client
1345def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]:
1346    """
1347    Sign S3 objects to be used by anonymous users in public apps
1348    Returns a list of signed s3 tokens
1349    """
1350    return _client.sign_s3_objects(s3_objects)
1351
1352
1353@init_global_client
1354def sign_s3_object(s3_object: S3Object| str) -> S3Object:
1355    """
1356    Sign S3 object to be used by anonymous users in public apps
1357    Returns a signed s3 object
1358    """
1359    return _client.sign_s3_object(s3_object)
1360
1361
1362@init_global_client
1363def get_presigned_s3_public_urls(
1364    s3_objects: list[S3Object | str],
1365    base_url: str | None = None,
1366) -> list[str]:
1367    """
1368    Generate presigned public URLs for an array of S3 objects.
1369    If an S3 object is not signed yet, it will be signed first.
1370
1371    Args:
1372        s3_objects: List of S3 objects to sign
1373        base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
1374
1375    Returns:
1376        List of signed public URLs
1377
1378    Example:
1379        >>> import wmill
1380        >>> from wmill import S3Object
1381        >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")]
1382        >>> urls = wmill.get_presigned_s3_public_urls(s3_objs)
1383    """
1384    return _client.get_presigned_s3_public_urls(s3_objects, base_url)
1385
1386
1387@init_global_client
1388def get_presigned_s3_public_url(
1389    s3_object: S3Object | str,
1390    base_url: str | None = None,
1391) -> str:
1392    """
1393    Generate a presigned public URL for an S3 object.
1394    If the S3 object is not signed yet, it will be signed first.
1395
1396    Args:
1397        s3_object: S3 object to sign
1398        base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
1399
1400    Returns:
1401        Signed public URL
1402
1403    Example:
1404        >>> import wmill
1405        >>> from wmill import S3Object
1406        >>> s3_obj = S3Object(s3="/path/to/file.txt")
1407        >>> url = wmill.get_presigned_s3_public_url(s3_obj)
1408    """
1409    return _client.get_presigned_s3_public_url(s3_object, base_url)
1410
1411
1412@init_global_client
1413def whoami() -> dict:
1414    """
1415    Returns the current user
1416    """
1417    return _client.user
1418
1419
1420@init_global_client
1421@deprecate("Windmill().state")
1422def get_state() -> Any:
1423    """
1424    Get the state
1425    """
1426    return _client.state
1427
1428
1429@init_global_client
1430def get_resource(
1431    path: str,
1432    none_if_undefined: bool = False,
1433) -> dict | None:
1434    """Get resource from Windmill"""
1435    return _client.get_resource(path, none_if_undefined)
1436
1437
1438@init_global_client
1439def set_resource(path: str, value: Any, resource_type: str = "any") -> None:
1440    """
1441    Set the resource at a given path as a string, creating it if it does not exist
1442    """
1443    return _client.set_resource(value=value, path=path, resource_type=resource_type)
1444
1445
1446@init_global_client
1447def list_resources(
1448    resource_type: str = None,
1449    page: int = None,
1450    per_page: int = None,
1451) -> list[dict]:
1452    """List resources from Windmill workspace.
1453    
1454    Args:
1455        resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3")
1456        page: Optional page number for pagination
1457        per_page: Optional number of results per page
1458        
1459    Returns:
1460        List of resource dictionaries
1461        
1462    Example:
1463        >>> # Get all resources
1464        >>> all_resources = wmill.list_resources()
1465        
1466        >>> # Get only PostgreSQL resources
1467        >>> pg_resources = wmill.list_resources(resource_type="postgresql")
1468    """
1469    return _client.list_resources(
1470        resource_type=resource_type,
1471        page=page,
1472        per_page=per_page,
1473    )
1474
1475
1476@init_global_client
1477def set_state(value: Any) -> None:
1478    """
1479    Set the state
1480    """
1481    return _client.set_state(value)
1482
1483
1484@init_global_client
1485def set_progress(value: int, job_id: Optional[str] = None) -> None:
1486    """
1487    Set the progress
1488    """
1489    return _client.set_progress(value, job_id)
1490
1491
1492@init_global_client
1493def get_progress(job_id: Optional[str] = None) -> Any:
1494    """
1495    Get the progress
1496    """
1497    return _client.get_progress(job_id)
1498
1499
1500def set_shared_state_pickle(value: Any, path="state.pickle") -> None:
1501    """
1502    Set the state in the shared folder using pickle
1503    """
1504    return Windmill.set_shared_state_pickle(value=value, path=path)
1505
1506
1507@deprecate("Windmill.get_shared_state_pickle(...)")
1508def get_shared_state_pickle(path="state.pickle") -> Any:
1509    """
1510    Get the state in the shared folder using pickle
1511    """
1512    return Windmill.get_shared_state_pickle(path=path)
1513
1514
1515def set_shared_state(value: Any, path="state.json") -> None:
1516    """
1517    Set the state in the shared folder using pickle
1518    """
1519    return Windmill.set_shared_state(value=value, path=path)
1520
1521
1522def get_shared_state(path="state.json") -> None:
1523    """
1524    Get the state in the shared folder using pickle
1525    """
1526    return Windmill.get_shared_state(path=path)
1527
1528
1529@init_global_client
1530def get_variable(path: str) -> str:
1531    """
1532    Returns the variable at a given path as a string
1533    """
1534    return _client.get_variable(path)
1535
1536
1537@init_global_client
1538def set_variable(path: str, value: str, is_secret: bool = False) -> None:
1539    """
1540    Set the variable at a given path as a string, creating it if it does not exist
1541    """
1542    return _client.set_variable(path, value, is_secret)
1543
1544
1545@init_global_client
1546def get_flow_user_state(key: str) -> Any:
1547    """
1548    Get the user state of a flow at a given key
1549    """
1550    return _client.get_flow_user_state(key)
1551
1552
1553@init_global_client
1554def set_flow_user_state(key: str, value: Any) -> None:
1555    """
1556    Set the user state of a flow at a given key
1557    """
1558    return _client.set_flow_user_state(key, value)
1559
1560
1561@init_global_client
1562def get_state_path() -> str:
1563    return _client.state_path
1564
1565
1566@init_global_client
1567def get_resume_urls(approver: str = None) -> dict:
1568    return _client.get_resume_urls(approver)
1569
1570
1571@init_global_client
1572def request_interactive_slack_approval(
1573    slack_resource_path: str,
1574    channel_id: str,
1575    message: str = None,
1576    approver: str = None,
1577    default_args_json: dict = None,
1578    dynamic_enums_json: dict = None,
1579) -> None:
1580    return _client.request_interactive_slack_approval(
1581        slack_resource_path=slack_resource_path,
1582        channel_id=channel_id,
1583        message=message,
1584        approver=approver,
1585        default_args_json=default_args_json,
1586        dynamic_enums_json=dynamic_enums_json,
1587    )
1588
1589
1590@init_global_client
1591def send_teams_message(
1592    conversation_id: str, text: str, success: bool, card_block: dict = None
1593):
1594    return _client.send_teams_message(conversation_id, text, success, card_block)
1595
1596
1597@init_global_client
1598def cancel_job(job_id: str, reason: str = None) -> str:
1599    """Cancel a specific job by ID.
1600
1601    Args:
1602        job_id: UUID of the job to cancel
1603        reason: Optional reason for cancellation
1604
1605    Returns:
1606        Response message from the cancel endpoint
1607    """
1608    return _client.cancel_job(job_id, reason)
1609
1610
1611@init_global_client
1612def cancel_running() -> dict:
1613    """Cancel currently running executions of the same script."""
1614    return _client.cancel_running()
1615
1616
1617@init_global_client
1618def run_script(
1619    path: str = None,
1620    hash_: str = None,
1621    args: dict = None,
1622    timeout: dt.timedelta | int | float = None,
1623    verbose: bool = False,
1624    cleanup: bool = True,
1625    assert_result_is_not_none: bool = True,
1626) -> Any:
1627    """Run script synchronously and return its result.
1628    
1629    .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
1630    """
1631    return _client.run_script(
1632        path=path,
1633        hash_=hash_,
1634        args=args,
1635        verbose=verbose,
1636        assert_result_is_not_none=assert_result_is_not_none,
1637        cleanup=cleanup,
1638        timeout=timeout,
1639    )
1640
1641
1642@init_global_client
1643def run_script_by_path(
1644    path: str,
1645    args: dict = None,
1646    timeout: dt.timedelta | int | float = None,
1647    verbose: bool = False,
1648    cleanup: bool = True,
1649    assert_result_is_not_none: bool = True,
1650) -> Any:
1651    """Run script by path synchronously and return its result."""
1652    return _client.run_script_by_path(
1653        path=path,
1654        args=args,
1655        verbose=verbose,
1656        assert_result_is_not_none=assert_result_is_not_none,
1657        cleanup=cleanup,
1658        timeout=timeout,
1659    )
1660
1661
1662@init_global_client
1663def run_script_by_hash(
1664    hash_: str,
1665    args: dict = None,
1666    timeout: dt.timedelta | int | float = None,
1667    verbose: bool = False,
1668    cleanup: bool = True,
1669    assert_result_is_not_none: bool = True,
1670) -> Any:
1671    """Run script by hash synchronously and return its result."""
1672    return _client.run_script_by_hash(
1673        hash_=hash_,
1674        args=args,
1675        verbose=verbose,
1676        assert_result_is_not_none=assert_result_is_not_none,
1677        cleanup=cleanup,
1678        timeout=timeout,
1679    )
1680
1681@init_global_client
1682def run_inline_script_preview(
1683    content: str,
1684    language: str,
1685    args: dict = None,
1686) -> Any:
1687    """Run a script on the current worker without creating a job"""
1688    return _client.run_inline_script_preview(
1689        content=content,
1690        language=language,
1691        args=args,
1692    )
1693
1694@init_global_client
1695def username_to_email(username: str) -> str:
1696    """
1697    Get email from workspace username
1698    This method is particularly useful for apps that require the email address of the viewer.
1699    Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1700    """
1701    return _client.username_to_email(username)
1702
1703
1704@init_global_client
1705def datatable(name: str = "main") -> DataTableClient:
1706    return _client.datatable(name)
1707
1708@init_global_client
1709def ducklake(name: str = "main") -> DucklakeClient:
1710    return _client.ducklake(name)
1711
1712def task(*args, **kwargs):
1713    from inspect import signature
1714
1715    def f(func, tag: str | None = None):
1716        if (
1717            os.environ.get("WM_JOB_ID") is None
1718            or os.environ.get("MAIN_OVERRIDE") == func.__name__
1719        ):
1720
1721            def inner(*args, **kwargs):
1722                return func(*args, **kwargs)
1723
1724            return inner
1725        else:
1726
1727            def inner(*args, **kwargs):
1728                global _client
1729                if _client is None:
1730                    _client = Windmill()
1731                w_id = os.environ.get("WM_WORKSPACE")
1732                job_id = os.environ.get("WM_JOB_ID")
1733                f_name = func.__name__
1734                json = kwargs
1735                params = list(signature(func).parameters)
1736                for i, arg in enumerate(args):
1737                    if i < len(params):
1738                        p = params[i]
1739                        key = p
1740                        if key not in kwargs:
1741                            json[key] = arg
1742
1743                params = {}
1744                if tag is not None:
1745                    params["tag"] = tag
1746                w_as_code_response = _client.post(
1747                    f"/w/{w_id}/jobs/run/workflow_as_code/{job_id}/{f_name}",
1748                    json={"args": json},
1749                    params=params,
1750                )
1751                job_id = w_as_code_response.text
1752                print(f"Executing task {func.__name__} on job {job_id}")
1753                job_result = _client.wait_job(job_id)
1754                print(f"Task {func.__name__} ({job_id}) completed")
1755                return job_result
1756
1757            return inner
1758
1759    if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
1760        return f(args[0], None)
1761    else:
1762        return lambda x: f(x, kwargs.get("tag"))
1763
1764def parse_resource_syntax(s: str) -> Optional[str]:
1765    """Parse resource syntax from string."""
1766    if s is None:
1767        return None
1768    if s.startswith("$res:"):
1769        return s[5:]
1770    if s.startswith("res://"):
1771        return s[6:]
1772    return None
1773
1774def parse_s3_object(s3_object: S3Object | str) -> S3Object:
1775    """Parse S3 object from string or S3Object format."""
1776    if isinstance(s3_object, str):
1777        match = re.match(r'^s3://([^/]*)/(.*)$', s3_object)
1778        if match:
1779            return S3Object(s3=match.group(2) or "", storage=match.group(1) or None)
1780        return S3Object(s3="")
1781    else:
1782        return s3_object
1783
1784    
1785
1786def parse_variable_syntax(s: str) -> Optional[str]:
1787    """Parse variable syntax from string."""
1788    if s.startswith("var://"):
1789        return s[6:]
1790    return None
1791
1792
1793def append_to_result_stream(text: str) -> None:
1794    """Append a text to the result stream.
1795    
1796    Args:
1797        text: text to append to the result stream
1798    """
1799    print("WM_STREAM: {}".format(text.replace(chr(10), '\\n')))
1800
1801def stream_result(stream) -> None:
1802    """Stream to the result stream.
1803    
1804    Args:
1805        stream: stream to stream to the result stream
1806    """
1807    for text in stream:
1808        append_to_result_stream(text)
1809
1810class DataTableClient:
1811    def __init__(self, client: Windmill, name: str):
1812        self.client = client
1813        self.name = name
1814    def query(self, sql: str, *args):
1815        args_dict = {}
1816        args_def = ""
1817        for i, arg in enumerate(args):
1818            args_dict[f"arg{i+1}"] = arg
1819            args_def += f"-- ${i+1} arg{i+1}\n"
1820        sql = args_def + sql
1821        return SqlQuery(
1822            sql, 
1823            lambda sql: self.client.run_inline_script_preview(
1824                content=sql,
1825                language="postgresql",
1826                args={"database": f"datatable://{self.name}", **args_dict},
1827            )
1828        )
1829
1830class DucklakeClient:
1831    def __init__(self, client: Windmill, name: str):
1832        self.client = client
1833        self.name = name
1834    def query(self, sql: str, **kwargs):
1835        args_dict = {}
1836        args_def = ""
1837        for key, value in kwargs.items():
1838            args_dict[key] = value
1839            args_def += f"-- ${key} ({infer_sql_type(value)})\n"
1840        attach = f"ATTACH 'ducklake://{self.name}' AS dl;USE dl;\n"
1841        sql = args_def + attach + sql
1842        return SqlQuery(
1843            sql, 
1844            lambda sql: self.client.run_inline_script_preview(
1845                content=sql,
1846                language="duckdb",
1847                args=args_dict,
1848            )
1849        )
1850
1851class SqlQuery:
1852    def __init__(self, sql: str, fetch_fn):
1853        self.sql = sql
1854        self.fetch_fn = fetch_fn
1855    def fetch(self, result_collection: str | None = None):
1856        sql = self.sql
1857        if result_collection is not None:
1858            sql = f'-- result_collection={result_collection}\n{sql}'
1859        return self.fetch_fn(sql)
1860    def fetch_one(self):
1861        return self.fetch(result_collection="last_statement_first_row")
1862
1863def infer_sql_type(value) -> str:
1864    """
1865    DuckDB executor requires explicit argument types at declaration
1866    These types exist in both DuckDB and Postgres
1867    Check that the types exist if you plan to extend this function for other SQL engines.
1868    """
1869    if isinstance(value, bool):
1870        # Check bool before int since bool is a subclass of int in Python
1871        return "BOOLEAN"
1872    elif isinstance(value, int):
1873        return "BIGINT"
1874    elif isinstance(value, float):
1875        return "DOUBLE PRECISION"
1876    elif value is None:
1877        return "TEXT"
1878    elif isinstance(value, str):
1879        return "TEXT"
1880    elif isinstance(value, dict) or isinstance(value, list):
1881        return "JSON"
1882    else:
1883        return "TEXT"
logger = <Logger windmill_client (WARNING)>
JobStatus = typing.Literal['RUNNING', 'WAITING', 'COMPLETED']
class Windmill:
  35class Windmill:
  36    def __init__(self, base_url=None, token=None, workspace=None, verify=True):
  37        base = (
  38            base_url
  39            or os.environ.get("BASE_INTERNAL_URL")
  40            or os.environ.get("WM_BASE_URL")
  41        )
  42
  43        self.base_url = f"{base}/api"
  44        self.token = token or os.environ.get("WM_TOKEN")
  45        self.headers = {
  46            "Content-Type": "application/json",
  47            "Authorization": f"Bearer {self.token}",
  48        }
  49        self.verify = verify
  50        self.client = self.get_client()
  51        self.workspace = workspace or os.environ.get("WM_WORKSPACE")
  52        self.path = os.environ.get("WM_JOB_PATH")
  53
  54        self.mocked_api = self.get_mocked_api()
  55
  56        assert self.workspace, (
  57            f"workspace required as an argument or as WM_WORKSPACE environment variable"
  58        )
  59
  60    def get_mocked_api(self) -> Optional[dict]:
  61        mocked_path = os.environ.get("WM_MOCKED_API_FILE")
  62        if not mocked_path:
  63            return None
  64        logger.info("Using mocked API from %s", mocked_path)
  65        mocked_api = {"variables": {}, "resources": {}}
  66        try:
  67            with open(mocked_path, "r") as f:
  68                incoming_mocked_api = json.load(f)
  69            mocked_api = {**mocked_api, **incoming_mocked_api}
  70        except Exception as e:
  71            logger.warning(
  72                "Error parsing mocked API file at path %s Using empty mocked API.",
  73                mocked_path,
  74            )
  75            logger.debug(e)
  76        return mocked_api
  77
  78    def get_client(self) -> httpx.Client:
  79        return httpx.Client(
  80            base_url=self.base_url,
  81            headers=self.headers,
  82            verify=self.verify,
  83        )
  84
  85    def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
  86        endpoint = endpoint.lstrip("/")
  87        resp = self.client.get(f"/{endpoint}", **kwargs)
  88        if raise_for_status:
  89            try:
  90                resp.raise_for_status()
  91            except httpx.HTTPStatusError as err:
  92                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
  93                logger.error(error)
  94                raise Exception(error)
  95        return resp
  96
  97    def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
  98        endpoint = endpoint.lstrip("/")
  99        resp = self.client.post(f"/{endpoint}", **kwargs)
 100        if raise_for_status:
 101            try:
 102                resp.raise_for_status()
 103            except httpx.HTTPStatusError as err:
 104                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
 105                logger.error(error)
 106                raise Exception(error)
 107        return resp
 108
 109    def create_token(self, duration=dt.timedelta(days=1)) -> str:
 110        endpoint = "/users/tokens/create"
 111        payload = {
 112            "label": f"refresh {time.time()}",
 113            "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"),
 114        }
 115        return self.post(endpoint, json=payload).text
 116
 117    def run_script_async(
 118        self,
 119        path: str = None,
 120        hash_: str = None,
 121        args: dict = None,
 122        scheduled_in_secs: int = None,
 123    ) -> str:
 124        """Create a script job and return its job id.
 125        
 126        .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.
 127        """
 128        logging.warning(
 129            "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.",
 130        )
 131        assert not (path and hash_), "path and hash_ are mutually exclusive"
 132        return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
 133
 134    def _run_script_async_internal(
 135        self,
 136        path: str = None,
 137        hash_: str = None,
 138        args: dict = None,
 139        scheduled_in_secs: int = None,
 140    ) -> str:
 141        """Internal helper for running scripts asynchronously."""
 142        args = args or {}
 143        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
 144        if os.environ.get("WM_JOB_ID"):
 145            params["parent_job"] = os.environ.get("WM_JOB_ID")
 146        if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
 147            params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
 148        
 149        if path:
 150            endpoint = f"/w/{self.workspace}/jobs/run/p/{path}"
 151        elif hash_:
 152            endpoint = f"/w/{self.workspace}/jobs/run/h/{hash_}"
 153        else:
 154            raise Exception("path or hash_ must be provided")
 155        
 156        return self.post(endpoint, json=args, params=params).text
 157
 158    def run_script_by_path_async(
 159        self,
 160        path: str,
 161        args: dict = None,
 162        scheduled_in_secs: int = None,
 163    ) -> str:
 164        """Create a script job by path and return its job id."""
 165        return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs)
 166
 167    def run_script_by_hash_async(
 168        self,
 169        hash_: str,
 170        args: dict = None,
 171        scheduled_in_secs: int = None,
 172    ) -> str:
 173        """Create a script job by hash and return its job id."""
 174        return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
 175
 176    def run_flow_async(
 177        self,
 178        path: str,
 179        args: dict = None,
 180        scheduled_in_secs: int = None,
 181        # can only be set to false if this the job will be fully await and not concurrent with any other job
 182        # as otherwise the child flow and its own child will store their state in the parent job which will
 183        # lead to incorrectness and failures
 184        do_not_track_in_parent: bool = True,
 185    ) -> str:
 186        """Create a flow job and return its job id."""
 187        args = args or {}
 188        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
 189        if not do_not_track_in_parent:
 190            if os.environ.get("WM_JOB_ID"):
 191                params["parent_job"] = os.environ.get("WM_JOB_ID")
 192            if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
 193                params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
 194        if path:
 195            endpoint = f"/w/{self.workspace}/jobs/run/f/{path}"
 196        else:
 197            raise Exception("path must be provided")
 198        return self.post(endpoint, json=args, params=params).text
 199
 200    def run_script(
 201        self,
 202        path: str = None,
 203        hash_: str = None,
 204        args: dict = None,
 205        timeout: dt.timedelta | int | float | None = None,
 206        verbose: bool = False,
 207        cleanup: bool = True,
 208        assert_result_is_not_none: bool = False,
 209    ) -> Any:
 210        """Run script synchronously and return its result.
 211        
 212        .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
 213        """
 214        logging.warning(
 215            "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.",
 216        )
 217        assert not (path and hash_), "path and hash_ are mutually exclusive"
 218        return self._run_script_internal(
 219            path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose,
 220            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 221        )
 222
 223    def _run_script_internal(
 224        self,
 225        path: str = None,
 226        hash_: str = None,
 227        args: dict = None,
 228        timeout: dt.timedelta | int | float | None = None,
 229        verbose: bool = False,
 230        cleanup: bool = True,
 231        assert_result_is_not_none: bool = False,
 232    ) -> Any:
 233        """Internal helper for running scripts synchronously."""
 234        args = args or {}
 235
 236        if verbose:
 237            if path:
 238                logger.info(f"running `{path}` synchronously with {args = }")
 239            elif hash_:
 240                logger.info(f"running script with hash `{hash_}` synchronously with {args = }")
 241
 242        if isinstance(timeout, dt.timedelta):
 243            timeout = timeout.total_seconds()
 244
 245        job_id = self._run_script_async_internal(path=path, hash_=hash_, args=args)
 246        return self.wait_job(
 247            job_id, timeout, verbose, cleanup, assert_result_is_not_none
 248        )
 249
 250    def run_script_by_path(
 251        self,
 252        path: str,
 253        args: dict = None,
 254        timeout: dt.timedelta | int | float | None = None,
 255        verbose: bool = False,
 256        cleanup: bool = True,
 257        assert_result_is_not_none: bool = False,
 258    ) -> Any:
 259        """Run script by path synchronously and return its result."""
 260        return self._run_script_internal(
 261            path=path, args=args, timeout=timeout, verbose=verbose,
 262            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 263        )
 264
 265    def run_script_by_hash(
 266        self,
 267        hash_: str,
 268        args: dict = None,
 269        timeout: dt.timedelta | int | float | None = None,
 270        verbose: bool = False,
 271        cleanup: bool = True,
 272        assert_result_is_not_none: bool = False,
 273    ) -> Any:
 274        """Run script by hash synchronously and return its result."""
 275        return self._run_script_internal(
 276            hash_=hash_, args=args, timeout=timeout, verbose=verbose,
 277            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 278        )
 279
 280    def run_inline_script_preview(
 281        self,
 282        content: str,
 283        language: str,
 284        args: dict = None,
 285    ) -> Any:
 286        """Run a script on the current worker without creating a job"""
 287        endpoint = f"/w/{self.workspace}/jobs/run_inline/preview"
 288        body = {
 289            "content": content,
 290            "language": language,
 291            "args": args or {},
 292        }
 293        return self.post(endpoint, json=body).text
 294
 295    def wait_job(
 296        self,
 297        job_id,
 298        timeout: dt.timedelta | int | float | None = None,
 299        verbose: bool = False,
 300        cleanup: bool = True,
 301        assert_result_is_not_none: bool = False,
 302    ):
 303        def cancel_job():
 304            logger.warning(f"cancelling job: {job_id}")
 305            self.post(
 306                f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
 307                json={"reason": "parent script cancelled"},
 308            ).raise_for_status()
 309
 310        if cleanup:
 311            atexit.register(cancel_job)
 312
 313        start_time = time.time()
 314
 315        if isinstance(timeout, dt.timedelta):
 316            timeout = timeout.total_seconds()
 317
 318        while True:
 319            result_res = self.get(
 320                f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True
 321            ).json()
 322
 323            started = result_res["started"]
 324            completed = result_res["completed"]
 325            success = result_res["success"]
 326
 327            if not started and verbose:
 328                logger.info(f"job {job_id} has not started yet")
 329
 330            if cleanup and completed:
 331                atexit.unregister(cancel_job)
 332
 333            if completed:
 334                result = result_res["result"]
 335                if success:
 336                    if result is None and assert_result_is_not_none:
 337                        raise Exception("Result was none")
 338                    return result
 339                else:
 340                    error = result["error"]
 341                    raise Exception(f"Job {job_id} was not successful: {str(error)}")
 342
 343            if timeout and ((time.time() - start_time) > timeout):
 344                msg = "reached timeout"
 345                logger.warning(msg)
 346                self.post(
 347                    f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
 348                    json={"reason": msg},
 349                )
 350                raise TimeoutError(msg)
 351            if verbose:
 352                logger.info(f"sleeping 0.5 seconds for {job_id = }")
 353
 354            time.sleep(0.5)
 355
 356    def cancel_job(self, job_id: str, reason: str = None) -> str:
 357        """Cancel a specific job by ID.
 358
 359        Args:
 360            job_id: UUID of the job to cancel
 361            reason: Optional reason for cancellation
 362
 363        Returns:
 364            Response message from the cancel endpoint
 365        """
 366        logger.info(f"cancelling job: {job_id}")
 367
 368        payload = {"reason": reason or "cancelled via cancel_job method"}
 369
 370        response = self.post(
 371            f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
 372            json=payload,
 373        )
 374
 375        return response.text
 376
 377    def cancel_running(self) -> dict:
 378        """Cancel currently running executions of the same script."""
 379        logger.info("canceling running executions of this script")
 380
 381        jobs = self.get(
 382            f"/w/{self.workspace}/jobs/list",
 383            params={
 384                "running": "true",
 385                "script_path_exact": self.path,
 386            },
 387        ).json()
 388
 389        current_job_id = os.environ.get("WM_JOB_ID")
 390
 391        logger.debug(f"{current_job_id = }")
 392
 393        job_ids = [j["id"] for j in jobs if j["id"] != current_job_id]
 394
 395        if job_ids:
 396            logger.info(f"cancelling the following job ids: {job_ids}")
 397        else:
 398            logger.info("no previous executions to cancel")
 399
 400        result = {}
 401
 402        for id_ in job_ids:
 403            result[id_] = self.post(
 404                f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}",
 405                json={"reason": "killed by `cancel_running` method"},
 406            )
 407
 408        return result
 409
 410    def get_job(self, job_id: str) -> dict:
 411        return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json()
 412
 413    def get_root_job_id(self, job_id: str | None = None) -> dict:
 414        job_id = job_id or os.environ.get("WM_JOB_ID")
 415        return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json()
 416
 417    def get_id_token(self, audience: str, expires_in: int | None = None) -> str:
 418        params = {}
 419        if expires_in is not None:
 420            params["expires_in"] = expires_in
 421        return self.post(f"/w/{self.workspace}/oidc/token/{audience}", params=params).text
 422
 423    def get_job_status(self, job_id: str) -> JobStatus:
 424        job = self.get_job(job_id)
 425        job_type = job.get("type", "")
 426        assert job_type, f"{job} is not a valid job"
 427        if job_type.lower() == "completedjob":
 428            return "COMPLETED"
 429        if job.get("running"):
 430            return "RUNNING"
 431        return "WAITING"
 432
 433    def get_result(
 434        self,
 435        job_id: str,
 436        assert_result_is_not_none: bool = True,
 437    ) -> Any:
 438        result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}")
 439        result_text = result.text
 440        if assert_result_is_not_none and result_text is None:
 441            raise Exception(f"result is None for {job_id = }")
 442        try:
 443            return result.json()
 444        except JSONDecodeError:
 445            return result_text
 446
 447    def get_variable(self, path: str) -> str:
 448        path = parse_variable_syntax(path) or path
 449        if self.mocked_api is not None:
 450            variables = self.mocked_api["variables"]
 451            try:
 452                result = variables[path]
 453                return result
 454            except KeyError:
 455                logger.info(
 456                    f"MockedAPI present, but variable not found at {path}, falling back to real API"
 457                )
 458
 459        """Get variable from Windmill"""
 460        return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json()
 461
 462    def set_variable(self, path: str, value: str, is_secret: bool = False) -> None:
 463        path = parse_variable_syntax(path) or path
 464        if self.mocked_api is not None:
 465            self.mocked_api["variables"][path] = value
 466            return
 467
 468        """Set variable from Windmill"""
 469        # check if variable exists
 470        r = self.get(
 471            f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False
 472        )
 473        if r.status_code == 404:
 474            # create variable
 475            self.post(
 476                f"/w/{self.workspace}/variables/create",
 477                json={
 478                    "path": path,
 479                    "value": value,
 480                    "is_secret": is_secret,
 481                    "description": "",
 482                },
 483            )
 484        else:
 485            # update variable
 486            self.post(
 487                f"/w/{self.workspace}/variables/update/{path}",
 488                json={"value": value},
 489            )
 490
 491    def get_resource(
 492        self,
 493        path: str,
 494        none_if_undefined: bool = False,
 495    ) -> dict | None:
 496        path = parse_resource_syntax(path) or path
 497        if self.mocked_api is not None:
 498            resources = self.mocked_api["resources"]
 499            try:
 500                result = resources[path]
 501                return result
 502            except KeyError:
 503                # NOTE: should mocked_api respect `none_if_undefined`?
 504                if none_if_undefined:
 505                    logger.info(
 506                        f"resource not found at ${path}, but none_if_undefined is True, so returning None"
 507                    )
 508                    return None
 509                logger.info(
 510                    f"MockedAPI present, but resource not found at ${path}, falling back to real API"
 511                )
 512
 513        """Get resource from Windmill"""
 514        try:
 515            return self.get(
 516                f"/w/{self.workspace}/resources/get_value_interpolated/{path}"
 517            ).json()
 518        except Exception as e:
 519            if none_if_undefined:
 520                return None
 521            logger.error(e)
 522            raise e
 523
 524    def set_resource(
 525        self,
 526        value: Any,
 527        path: str,
 528        resource_type: str,
 529    ):
 530        path = parse_resource_syntax(path) or path
 531        if self.mocked_api is not None:
 532            self.mocked_api["resources"][path] = value
 533            return
 534
 535        # check if resource exists
 536        r = self.get(
 537            f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False
 538        )
 539        if r.status_code == 404:
 540            # create resource
 541            self.post(
 542                f"/w/{self.workspace}/resources/create",
 543                json={
 544                    "path": path,
 545                    "value": value,
 546                    "resource_type": resource_type,
 547                },
 548            )
 549        else:
 550            # update resource
 551            self.post(
 552                f"/w/{self.workspace}/resources/update_value/{path}",
 553                json={"value": value},
 554            )
 555
 556    def list_resources(
 557        self,
 558        resource_type: str = None,
 559        page: int = None,
 560        per_page: int = None,
 561    ) -> list[dict]:
 562        """List resources from Windmill workspace.
 563        
 564        Args:
 565            resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3")
 566            page: Optional page number for pagination
 567            per_page: Optional number of results per page
 568            
 569        Returns:
 570            List of resource dictionaries
 571        """
 572        params = {}
 573        if resource_type is not None:
 574            params["resource_type"] = resource_type
 575        if page is not None:
 576            params["page"] = page
 577        if per_page is not None:
 578            params["per_page"] = per_page
 579            
 580        return self.get(
 581            f"/w/{self.workspace}/resources/list",
 582            params=params if params else None,
 583        ).json()
 584    
 585    def set_state(self, value: Any):
 586        self.set_resource(value, path=self.state_path, resource_type="state")
 587
 588    def set_progress(self, value: int, job_id: Optional[str] = None):
 589        workspace = get_workspace()
 590        flow_id = os.environ.get("WM_FLOW_JOB_ID")
 591        job_id = job_id or os.environ.get("WM_JOB_ID")
 592
 593        if job_id != None:
 594            job = self.get_job(job_id)
 595            flow_id = job.get("parent_job")
 596
 597        self.post(
 598            f"/w/{workspace}/job_metrics/set_progress/{job_id}",
 599            json={
 600                "percent": value,
 601                "flow_job_id": flow_id or None,
 602            },
 603        )
 604
 605    def get_progress(self, job_id: Optional[str] = None) -> Any:
 606        workspace = get_workspace()
 607        job_id = job_id or os.environ.get("WM_JOB_ID")
 608
 609        r = self.get(
 610            f"/w/{workspace}/job_metrics/get_progress/{job_id}",
 611        )
 612        if r.status_code == 404:
 613            print(f"Job {job_id} does not exist")
 614            return None
 615        else:
 616            return r.json()
 617
 618    def set_flow_user_state(self, key: str, value: Any) -> None:
 619        """Set the user state of a flow at a given key"""
 620        flow_id = self.get_root_job_id()
 621        r = self.post(
 622            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
 623            json=value,
 624            raise_for_status=False,
 625        )
 626        if r.status_code == 404:
 627            print(f"Job {flow_id} does not exist or is not a flow")
 628
 629    def get_flow_user_state(self, key: str) -> Any:
 630        """Get the user state of a flow at a given key"""
 631        flow_id = self.get_root_job_id()
 632        r = self.get(
 633            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
 634            raise_for_status=False,
 635        )
 636        if r.status_code == 404:
 637            print(f"Job {flow_id} does not exist or is not a flow")
 638            return None
 639        else:
 640            return r.json()
 641
 642    @property
 643    def version(self):
 644        return self.get("version").text
 645
 646    def get_duckdb_connection_settings(
 647        self,
 648        s3_resource_path: str = "",
 649    ) -> DuckDbConnectionSettings | None:
 650        """
 651        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 652        initiate an S3 connection from DuckDB
 653        """
 654        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 655        try:
 656            raw_obj = self.post(
 657                f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings",
 658                json={}
 659                if s3_resource_path == ""
 660                else {"s3_resource_path": s3_resource_path},
 661            ).json()
 662            return DuckDbConnectionSettings(raw_obj)
 663        except JSONDecodeError as e:
 664            raise Exception(
 665                "Could not generate DuckDB S3 connection settings from the provided resource"
 666            ) from e
 667
 668    def get_polars_connection_settings(
 669        self,
 670        s3_resource_path: str = "",
 671    ) -> PolarsConnectionSettings:
 672        """
 673        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 674        initiate an S3 connection from Polars
 675        """
 676        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 677        try:
 678            raw_obj = self.post(
 679                f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings",
 680                json={}
 681                if s3_resource_path == ""
 682                else {"s3_resource_path": s3_resource_path},
 683            ).json()
 684            return PolarsConnectionSettings(raw_obj)
 685        except JSONDecodeError as e:
 686            raise Exception(
 687                "Could not generate Polars S3 connection settings from the provided resource"
 688            ) from e
 689
 690    def get_boto3_connection_settings(
 691        self,
 692        s3_resource_path: str = "",
 693    ) -> Boto3ConnectionSettings:
 694        """
 695        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 696        initiate an S3 connection using boto3
 697        """
 698        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 699        try:
 700            s3_resource = self.post(
 701                f"/w/{self.workspace}/job_helpers/v2/s3_resource_info",
 702                json={}
 703                if s3_resource_path == ""
 704                else {"s3_resource_path": s3_resource_path},
 705            ).json()
 706            return self.__boto3_connection_settings(s3_resource)
 707        except JSONDecodeError as e:
 708            raise Exception(
 709                "Could not generate Boto3 S3 connection settings from the provided resource"
 710            ) from e
 711
 712    def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes:
 713        """
 714        Load a file from the workspace s3 bucket and returns its content as bytes.
 715
 716        '''python
 717        from wmill import S3Object
 718
 719        s3_obj = S3Object(s3="/path/to/my_file.txt")
 720        my_obj_content = client.load_s3_file(s3_obj)
 721        file_content = my_obj_content.decode("utf-8")
 722        '''
 723        """
 724        s3object = parse_s3_object(s3object)
 725        with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
 726            return file_reader.read()
 727
 728    def load_s3_file_reader(
 729        self, s3object: S3Object | str, s3_resource_path: str | None
 730    ) -> BufferedReader:
 731        """
 732        Load a file from the workspace s3 bucket and returns the bytes stream.
 733
 734        '''python
 735        from wmill import S3Object
 736
 737        s3_obj = S3Object(s3="/path/to/my_file.txt")
 738        with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
 739            print(file_reader.read())
 740        '''
 741        """
 742        s3object = parse_s3_object(s3object)
 743        reader = S3BufferedReader(
 744            f"{self.workspace}",
 745            self.client,
 746            s3object["s3"],
 747            s3_resource_path,
 748            s3object["storage"] if "storage" in s3object else None,
 749        )
 750        return reader
 751
 752    def write_s3_file(
 753        self,
 754        s3object: S3Object | str | None,
 755        file_content: BufferedReader | bytes,
 756        s3_resource_path: str | None,
 757        content_type: str | None = None,
 758        content_disposition: str | None = None,
 759    ) -> S3Object:
 760        """
 761        Write a file to the workspace S3 bucket
 762
 763        '''python
 764        from wmill import S3Object
 765
 766        s3_obj = S3Object(s3="/path/to/my_file.txt")
 767
 768        # for an in memory bytes array:
 769        file_content = b'Hello Windmill!'
 770        client.write_s3_file(s3_obj, file_content)
 771
 772        # for a file:
 773        with open("my_file.txt", "rb") as my_file:
 774            client.write_s3_file(s3_obj, my_file)
 775        '''
 776        """
 777        s3object = parse_s3_object(s3object)
 778        # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator
 779        if isinstance(file_content, BufferedReader):
 780            content_payload = bytes_generator(file_content)
 781        elif isinstance(file_content, bytes):
 782            content_payload = file_content
 783        else:
 784            raise Exception("Type of file_content not supported")
 785
 786        query_params = {}
 787        if s3object is not None and s3object["s3"] != "":
 788            query_params["file_key"] = s3object["s3"]
 789        if s3_resource_path is not None and s3_resource_path != "":
 790            query_params["s3_resource_path"] = s3_resource_path
 791        if (
 792            s3object is not None
 793            and "storage" in s3object
 794            and s3object["storage"] is not None
 795        ):
 796            query_params["storage"] = s3object["storage"]
 797        if content_type is not None:
 798            query_params["content_type"] = content_type
 799        if content_disposition is not None:
 800            query_params["content_disposition"] = content_disposition
 801
 802        try:
 803            # need a vanilla client b/c content-type is not application/json here
 804            response = httpx.post(
 805                f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file",
 806                headers={
 807                    "Authorization": f"Bearer {self.token}",
 808                    "Content-Type": "application/octet-stream",
 809                },
 810                params=query_params,
 811                content=content_payload,
 812                verify=self.verify,
 813                timeout=None,
 814            ).json()
 815        except Exception as e:
 816            raise Exception("Could not write file to S3") from e
 817        return S3Object(s3=response["file_key"])
 818
 819    def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]:
 820        return self.post(
 821            f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))}
 822        ).json()
 823
 824    def sign_s3_object(self, s3_object: S3Object | str) -> S3Object:
 825        return self.post(
 826            f"/w/{self.workspace}/apps/sign_s3_objects",
 827            json={"s3_objects": [s3_object]},
 828        ).json()[0]
 829
 830    def get_presigned_s3_public_urls(
 831        self,
 832        s3_objects: list[S3Object | str],
 833        base_url: str | None = None,
 834    ) -> list[str]:
 835        """
 836        Generate presigned public URLs for an array of S3 objects.
 837        If an S3 object is not signed yet, it will be signed first.
 838
 839        Args:
 840            s3_objects: List of S3 objects to sign
 841            base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
 842
 843        Returns:
 844            List of signed public URLs
 845
 846        Example:
 847            >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")]
 848            >>> urls = client.get_presigned_s3_public_urls(s3_objs)
 849        """
 850        base_url = base_url or self._get_public_base_url()
 851
 852        s3_objs = [parse_s3_object(s3_obj) for s3_obj in s3_objects]
 853
 854        # Sign all S3 objects that need to be signed in one go
 855        s3_objs_to_sign: list[tuple[S3Object, int]] = [
 856            (s3_obj, index)
 857            for index, s3_obj in enumerate(s3_objs)
 858            if s3_obj.get("presigned") is None
 859        ]
 860
 861        if s3_objs_to_sign:
 862            print(f"Signing {len(s3_objs_to_sign)} S3 objects...")
 863            signed_s3_objs = self.sign_s3_objects(
 864                [s3_obj for s3_obj, _ in s3_objs_to_sign]
 865            )
 866            for i, (_, original_index) in enumerate(s3_objs_to_sign):
 867                s3_objs[original_index] = parse_s3_object(signed_s3_objs[i])
 868
 869        signed_urls: list[str] = []
 870        for s3_obj in s3_objs:
 871            s3 = s3_obj.get("s3", "")
 872            presigned = s3_obj.get("presigned", "")
 873            storage = s3_obj.get("storage", "_default_")
 874            signed_url = f"{base_url}/api/w/{self.workspace}/s3_proxy/{storage}/{s3}?{presigned}"
 875            signed_urls.append(signed_url)
 876
 877        return signed_urls
 878
 879    def get_presigned_s3_public_url(
 880        self,
 881        s3_object: S3Object | str,
 882        base_url: str | None = None,
 883    ) -> str:
 884        """
 885        Generate a presigned public URL for an S3 object.
 886        If the S3 object is not signed yet, it will be signed first.
 887
 888        Args:
 889            s3_object: S3 object to sign
 890            base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
 891
 892        Returns:
 893            Signed public URL
 894
 895        Example:
 896            >>> s3_obj = S3Object(s3="/path/to/file.txt")
 897            >>> url = client.get_presigned_s3_public_url(s3_obj)
 898        """
 899        urls = self.get_presigned_s3_public_urls([s3_object], base_url)
 900        return urls[0]
 901
 902    def _get_public_base_url(self) -> str:
 903        """Get the public base URL from environment or default to localhost"""
 904        return os.environ.get("WM_BASE_URL", "http://localhost:3000")
 905
 906    def __boto3_connection_settings(self, s3_resource) -> Boto3ConnectionSettings:
 907        endpoint_url_prefix = "https://" if s3_resource["useSSL"] else "http://"
 908        return Boto3ConnectionSettings(
 909            {
 910                "endpoint_url": "{}{}".format(
 911                    endpoint_url_prefix, s3_resource["endPoint"]
 912                ),
 913                "region_name": s3_resource["region"],
 914                "use_ssl": s3_resource["useSSL"],
 915                "aws_access_key_id": s3_resource["accessKey"],
 916                "aws_secret_access_key": s3_resource["secretKey"],
 917                # no need for path_style here as boto3 is clever enough to determine which one to use
 918            }
 919        )
 920
 921    def whoami(self) -> dict:
 922        return self.get("/users/whoami").json()
 923
 924    @property
 925    def user(self) -> dict:
 926        return self.whoami()
 927
 928    @property
 929    def state_path(self) -> str:
 930        state_path = os.environ.get(
 931            "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH")
 932        )
 933        if state_path is None:
 934            raise Exception("State path not found")
 935        return state_path
 936
 937    @property
 938    def state(self) -> Any:
 939        return self.get_resource(path=self.state_path, none_if_undefined=True)
 940
 941    @state.setter
 942    def state(self, value: Any) -> None:
 943        self.set_state(value)
 944
 945    @staticmethod
 946    def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None:
 947        """
 948        Set the state in the shared folder using pickle
 949        """
 950        import pickle
 951
 952        with open(f"/shared/{path}", "wb") as handle:
 953            pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL)
 954
 955    @staticmethod
 956    def get_shared_state_pickle(path: str = "state.pickle") -> Any:
 957        """
 958        Get the state in the shared folder using pickle
 959        """
 960        import pickle
 961
 962        with open(f"/shared/{path}", "rb") as handle:
 963            return pickle.load(handle)
 964
 965    @staticmethod
 966    def set_shared_state(value: Any, path: str = "state.json") -> None:
 967        """
 968        Set the state in the shared folder using pickle
 969        """
 970        import json
 971
 972        with open(f"/shared/{path}", "w", encoding="utf-8") as f:
 973            json.dump(value, f, ensure_ascii=False, indent=4)
 974
 975    @staticmethod
 976    def get_shared_state(path: str = "state.json") -> None:
 977        """
 978        Get the state in the shared folder using pickle
 979        """
 980        import json
 981
 982        with open(f"/shared/{path}", "r", encoding="utf-8") as f:
 983            return json.load(f)
 984
 985    def get_resume_urls(self, approver: str = None) -> dict:
 986        nonce = random.randint(0, 1000000000)
 987        job_id = os.environ.get("WM_JOB_ID") or "NO_ID"
 988        return self.get(
 989            f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}",
 990            params={"approver": approver},
 991        ).json()
 992
 993    def request_interactive_slack_approval(
 994        self,
 995        slack_resource_path: str,
 996        channel_id: str,
 997        message: str = None,
 998        approver: str = None,
 999        default_args_json: dict = None,
1000        dynamic_enums_json: dict = None,
1001    ) -> None:
1002        """
1003        Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
1004
1005        **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.
1006        Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
1007
1008        :param slack_resource_path: The path to the Slack resource in Windmill.
1009        :type slack_resource_path: str
1010        :param channel_id: The Slack channel ID where the approval request will be sent.
1011        :type channel_id: str
1012        :param message: Optional custom message to include in the Slack approval request.
1013        :type message: str, optional
1014        :param approver: Optional user ID or name of the approver for the request.
1015        :type approver: str, optional
1016        :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
1017        :type default_args_json: dict, optional
1018        :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
1019        :type dynamic_enums_json: dict, optional
1020
1021        :raises Exception: If the function is not called within a flow or flow preview.
1022        :raises Exception: If the required flow job or flow step environment variables are not set.
1023
1024        :return: None
1025
1026        **Usage Example:**
1027            >>> client.request_interactive_slack_approval(
1028            ...     slack_resource_path="/u/alex/my_slack_resource",
1029            ...     channel_id="admins-slack-channel",
1030            ...     message="Please approve this request",
1031            ...     approver="approver123",
1032            ...     default_args_json={"key1": "value1", "key2": 42},
1033            ...     dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},
1034            ... )
1035
1036        **Notes:**
1037        - This function must be executed within a Windmill flow or flow preview.
1038        - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context.
1039        """
1040        workspace = self.workspace
1041        flow_job_id = os.environ.get("WM_FLOW_JOB_ID")
1042
1043        if not flow_job_id:
1044            raise Exception(
1045                "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview."
1046            )
1047
1048        # Only include non-empty parameters
1049        params = {}
1050        if message:
1051            params["message"] = message
1052        if approver:
1053            params["approver"] = approver
1054        if slack_resource_path:
1055            params["slack_resource_path"] = slack_resource_path
1056        if channel_id:
1057            params["channel_id"] = channel_id
1058        if os.environ.get("WM_FLOW_STEP_ID"):
1059            params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID")
1060        if default_args_json:
1061            params["default_args_json"] = json.dumps(default_args_json)
1062        if dynamic_enums_json:
1063            params["dynamic_enums_json"] = json.dumps(dynamic_enums_json)
1064
1065        self.get(
1066            f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}",
1067            params=params,
1068        )
1069
1070    def username_to_email(self, username: str) -> str:
1071        """
1072        Get email from workspace username
1073        This method is particularly useful for apps that require the email address of the viewer.
1074        Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1075        """
1076        return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text
1077
1078    def send_teams_message(
1079        self,
1080        conversation_id: str,
1081        text: str,
1082        success: bool = True,
1083        card_block: dict = None,
1084    ):
1085        """
1086        Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
1087        """
1088        return self.post(
1089            f"/teams/activities",
1090            json={
1091                "conversation_id": conversation_id,
1092                "text": text,
1093                "success": success,
1094                "card_block": card_block,
1095            },
1096        )
1097
1098    def datatable(self, name: str = "main"):
1099        return DataTableClient(self, name)
1100
1101    def ducklake(self, name: str = "main"):
1102        return DucklakeClient(self, name)
Windmill(base_url=None, token=None, workspace=None, verify=True)
36    def __init__(self, base_url=None, token=None, workspace=None, verify=True):
37        base = (
38            base_url
39            or os.environ.get("BASE_INTERNAL_URL")
40            or os.environ.get("WM_BASE_URL")
41        )
42
43        self.base_url = f"{base}/api"
44        self.token = token or os.environ.get("WM_TOKEN")
45        self.headers = {
46            "Content-Type": "application/json",
47            "Authorization": f"Bearer {self.token}",
48        }
49        self.verify = verify
50        self.client = self.get_client()
51        self.workspace = workspace or os.environ.get("WM_WORKSPACE")
52        self.path = os.environ.get("WM_JOB_PATH")
53
54        self.mocked_api = self.get_mocked_api()
55
56        assert self.workspace, (
57            f"workspace required as an argument or as WM_WORKSPACE environment variable"
58        )
base_url
token
headers
verify
client
workspace
path
mocked_api
def get_mocked_api(self) -> Optional[dict]:
60    def get_mocked_api(self) -> Optional[dict]:
61        mocked_path = os.environ.get("WM_MOCKED_API_FILE")
62        if not mocked_path:
63            return None
64        logger.info("Using mocked API from %s", mocked_path)
65        mocked_api = {"variables": {}, "resources": {}}
66        try:
67            with open(mocked_path, "r") as f:
68                incoming_mocked_api = json.load(f)
69            mocked_api = {**mocked_api, **incoming_mocked_api}
70        except Exception as e:
71            logger.warning(
72                "Error parsing mocked API file at path %s Using empty mocked API.",
73                mocked_path,
74            )
75            logger.debug(e)
76        return mocked_api
def get_client(self) -> httpx.Client:
78    def get_client(self) -> httpx.Client:
79        return httpx.Client(
80            base_url=self.base_url,
81            headers=self.headers,
82            verify=self.verify,
83        )
def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
85    def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
86        endpoint = endpoint.lstrip("/")
87        resp = self.client.get(f"/{endpoint}", **kwargs)
88        if raise_for_status:
89            try:
90                resp.raise_for_status()
91            except httpx.HTTPStatusError as err:
92                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
93                logger.error(error)
94                raise Exception(error)
95        return resp
def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
 97    def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
 98        endpoint = endpoint.lstrip("/")
 99        resp = self.client.post(f"/{endpoint}", **kwargs)
100        if raise_for_status:
101            try:
102                resp.raise_for_status()
103            except httpx.HTTPStatusError as err:
104                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
105                logger.error(error)
106                raise Exception(error)
107        return resp
def create_token(self, duration=datetime.timedelta(days=1)) -> str:
109    def create_token(self, duration=dt.timedelta(days=1)) -> str:
110        endpoint = "/users/tokens/create"
111        payload = {
112            "label": f"refresh {time.time()}",
113            "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"),
114        }
115        return self.post(endpoint, json=payload).text
def run_script_async( self, path: str = None, hash_: str = None, args: dict = None, scheduled_in_secs: int = None) -> str:
117    def run_script_async(
118        self,
119        path: str = None,
120        hash_: str = None,
121        args: dict = None,
122        scheduled_in_secs: int = None,
123    ) -> str:
124        """Create a script job and return its job id.
125        
126        .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.
127        """
128        logging.warning(
129            "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.",
130        )
131        assert not (path and hash_), "path and hash_ are mutually exclusive"
132        return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)

Create a script job and return its job id.

Deprecated since version Use run_script_by_path_async or run_script_by_hash_async instead..

def run_script_by_path_async(self, path: str, args: dict = None, scheduled_in_secs: int = None) -> str:
158    def run_script_by_path_async(
159        self,
160        path: str,
161        args: dict = None,
162        scheduled_in_secs: int = None,
163    ) -> str:
164        """Create a script job by path and return its job id."""
165        return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs)

Create a script job by path and return its job id.

def run_script_by_hash_async( self, hash_: str, args: dict = None, scheduled_in_secs: int = None) -> str:
167    def run_script_by_hash_async(
168        self,
169        hash_: str,
170        args: dict = None,
171        scheduled_in_secs: int = None,
172    ) -> str:
173        """Create a script job by hash and return its job id."""
174        return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)

Create a script job by hash and return its job id.

def run_flow_async( self, path: str, args: dict = None, scheduled_in_secs: int = None, do_not_track_in_parent: bool = True) -> str:
176    def run_flow_async(
177        self,
178        path: str,
179        args: dict = None,
180        scheduled_in_secs: int = None,
181        # can only be set to false if this the job will be fully await and not concurrent with any other job
182        # as otherwise the child flow and its own child will store their state in the parent job which will
183        # lead to incorrectness and failures
184        do_not_track_in_parent: bool = True,
185    ) -> str:
186        """Create a flow job and return its job id."""
187        args = args or {}
188        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
189        if not do_not_track_in_parent:
190            if os.environ.get("WM_JOB_ID"):
191                params["parent_job"] = os.environ.get("WM_JOB_ID")
192            if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
193                params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
194        if path:
195            endpoint = f"/w/{self.workspace}/jobs/run/f/{path}"
196        else:
197            raise Exception("path must be provided")
198        return self.post(endpoint, json=args, params=params).text

Create a flow job and return its job id.

def run_script( self, path: str = None, hash_: str = None, args: dict = None, timeout: datetime.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any:
200    def run_script(
201        self,
202        path: str = None,
203        hash_: str = None,
204        args: dict = None,
205        timeout: dt.timedelta | int | float | None = None,
206        verbose: bool = False,
207        cleanup: bool = True,
208        assert_result_is_not_none: bool = False,
209    ) -> Any:
210        """Run script synchronously and return its result.
211        
212        .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
213        """
214        logging.warning(
215            "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.",
216        )
217        assert not (path and hash_), "path and hash_ are mutually exclusive"
218        return self._run_script_internal(
219            path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose,
220            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
221        )

Run script synchronously and return its result.

Deprecated since version Use run_script_by_path or run_script_by_hash instead..

def run_script_by_path( self, path: str, args: dict = None, timeout: datetime.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any:
250    def run_script_by_path(
251        self,
252        path: str,
253        args: dict = None,
254        timeout: dt.timedelta | int | float | None = None,
255        verbose: bool = False,
256        cleanup: bool = True,
257        assert_result_is_not_none: bool = False,
258    ) -> Any:
259        """Run script by path synchronously and return its result."""
260        return self._run_script_internal(
261            path=path, args=args, timeout=timeout, verbose=verbose,
262            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
263        )

Run script by path synchronously and return its result.

def run_script_by_hash( self, hash_: str, args: dict = None, timeout: datetime.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any:
265    def run_script_by_hash(
266        self,
267        hash_: str,
268        args: dict = None,
269        timeout: dt.timedelta | int | float | None = None,
270        verbose: bool = False,
271        cleanup: bool = True,
272        assert_result_is_not_none: bool = False,
273    ) -> Any:
274        """Run script by hash synchronously and return its result."""
275        return self._run_script_internal(
276            hash_=hash_, args=args, timeout=timeout, verbose=verbose,
277            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
278        )

Run script by hash synchronously and return its result.

def run_inline_script_preview(self, content: str, language: str, args: dict = None) -> Any:
280    def run_inline_script_preview(
281        self,
282        content: str,
283        language: str,
284        args: dict = None,
285    ) -> Any:
286        """Run a script on the current worker without creating a job"""
287        endpoint = f"/w/{self.workspace}/jobs/run_inline/preview"
288        body = {
289            "content": content,
290            "language": language,
291            "args": args or {},
292        }
293        return self.post(endpoint, json=body).text

Run a script on the current worker without creating a job

def wait_job( self, job_id, timeout: datetime.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False):
295    def wait_job(
296        self,
297        job_id,
298        timeout: dt.timedelta | int | float | None = None,
299        verbose: bool = False,
300        cleanup: bool = True,
301        assert_result_is_not_none: bool = False,
302    ):
303        def cancel_job():
304            logger.warning(f"cancelling job: {job_id}")
305            self.post(
306                f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
307                json={"reason": "parent script cancelled"},
308            ).raise_for_status()
309
310        if cleanup:
311            atexit.register(cancel_job)
312
313        start_time = time.time()
314
315        if isinstance(timeout, dt.timedelta):
316            timeout = timeout.total_seconds()
317
318        while True:
319            result_res = self.get(
320                f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True
321            ).json()
322
323            started = result_res["started"]
324            completed = result_res["completed"]
325            success = result_res["success"]
326
327            if not started and verbose:
328                logger.info(f"job {job_id} has not started yet")
329
330            if cleanup and completed:
331                atexit.unregister(cancel_job)
332
333            if completed:
334                result = result_res["result"]
335                if success:
336                    if result is None and assert_result_is_not_none:
337                        raise Exception("Result was none")
338                    return result
339                else:
340                    error = result["error"]
341                    raise Exception(f"Job {job_id} was not successful: {str(error)}")
342
343            if timeout and ((time.time() - start_time) > timeout):
344                msg = "reached timeout"
345                logger.warning(msg)
346                self.post(
347                    f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
348                    json={"reason": msg},
349                )
350                raise TimeoutError(msg)
351            if verbose:
352                logger.info(f"sleeping 0.5 seconds for {job_id = }")
353
354            time.sleep(0.5)
def cancel_job(self, job_id: str, reason: str = None) -> str:
356    def cancel_job(self, job_id: str, reason: str = None) -> str:
357        """Cancel a specific job by ID.
358
359        Args:
360            job_id: UUID of the job to cancel
361            reason: Optional reason for cancellation
362
363        Returns:
364            Response message from the cancel endpoint
365        """
366        logger.info(f"cancelling job: {job_id}")
367
368        payload = {"reason": reason or "cancelled via cancel_job method"}
369
370        response = self.post(
371            f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
372            json=payload,
373        )
374
375        return response.text

Cancel a specific job by ID.

Args: job_id: UUID of the job to cancel reason: Optional reason for cancellation

Returns: Response message from the cancel endpoint

def cancel_running(self) -> dict:
377    def cancel_running(self) -> dict:
378        """Cancel currently running executions of the same script."""
379        logger.info("canceling running executions of this script")
380
381        jobs = self.get(
382            f"/w/{self.workspace}/jobs/list",
383            params={
384                "running": "true",
385                "script_path_exact": self.path,
386            },
387        ).json()
388
389        current_job_id = os.environ.get("WM_JOB_ID")
390
391        logger.debug(f"{current_job_id = }")
392
393        job_ids = [j["id"] for j in jobs if j["id"] != current_job_id]
394
395        if job_ids:
396            logger.info(f"cancelling the following job ids: {job_ids}")
397        else:
398            logger.info("no previous executions to cancel")
399
400        result = {}
401
402        for id_ in job_ids:
403            result[id_] = self.post(
404                f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}",
405                json={"reason": "killed by `cancel_running` method"},
406            )
407
408        return result

Cancel currently running executions of the same script.

def get_job(self, job_id: str) -> dict:
410    def get_job(self, job_id: str) -> dict:
411        return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json()
def get_root_job_id(self, job_id: str | None = None) -> dict:
413    def get_root_job_id(self, job_id: str | None = None) -> dict:
414        job_id = job_id or os.environ.get("WM_JOB_ID")
415        return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json()
def get_id_token(self, audience: str, expires_in: int | None = None) -> str:
417    def get_id_token(self, audience: str, expires_in: int | None = None) -> str:
418        params = {}
419        if expires_in is not None:
420            params["expires_in"] = expires_in
421        return self.post(f"/w/{self.workspace}/oidc/token/{audience}", params=params).text
def get_job_status(self, job_id: str) -> Literal['RUNNING', 'WAITING', 'COMPLETED']:
423    def get_job_status(self, job_id: str) -> JobStatus:
424        job = self.get_job(job_id)
425        job_type = job.get("type", "")
426        assert job_type, f"{job} is not a valid job"
427        if job_type.lower() == "completedjob":
428            return "COMPLETED"
429        if job.get("running"):
430            return "RUNNING"
431        return "WAITING"
def get_result(self, job_id: str, assert_result_is_not_none: bool = True) -> Any:
433    def get_result(
434        self,
435        job_id: str,
436        assert_result_is_not_none: bool = True,
437    ) -> Any:
438        result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}")
439        result_text = result.text
440        if assert_result_is_not_none and result_text is None:
441            raise Exception(f"result is None for {job_id = }")
442        try:
443            return result.json()
444        except JSONDecodeError:
445            return result_text
def get_variable(self, path: str) -> str:
447    def get_variable(self, path: str) -> str:
448        path = parse_variable_syntax(path) or path
449        if self.mocked_api is not None:
450            variables = self.mocked_api["variables"]
451            try:
452                result = variables[path]
453                return result
454            except KeyError:
455                logger.info(
456                    f"MockedAPI present, but variable not found at {path}, falling back to real API"
457                )
458
459        """Get variable from Windmill"""
460        return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json()
def set_variable(self, path: str, value: str, is_secret: bool = False) -> None:
462    def set_variable(self, path: str, value: str, is_secret: bool = False) -> None:
463        path = parse_variable_syntax(path) or path
464        if self.mocked_api is not None:
465            self.mocked_api["variables"][path] = value
466            return
467
468        """Set variable from Windmill"""
469        # check if variable exists
470        r = self.get(
471            f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False
472        )
473        if r.status_code == 404:
474            # create variable
475            self.post(
476                f"/w/{self.workspace}/variables/create",
477                json={
478                    "path": path,
479                    "value": value,
480                    "is_secret": is_secret,
481                    "description": "",
482                },
483            )
484        else:
485            # update variable
486            self.post(
487                f"/w/{self.workspace}/variables/update/{path}",
488                json={"value": value},
489            )
def get_resource(self, path: str, none_if_undefined: bool = False) -> dict | None:
491    def get_resource(
492        self,
493        path: str,
494        none_if_undefined: bool = False,
495    ) -> dict | None:
496        path = parse_resource_syntax(path) or path
497        if self.mocked_api is not None:
498            resources = self.mocked_api["resources"]
499            try:
500                result = resources[path]
501                return result
502            except KeyError:
503                # NOTE: should mocked_api respect `none_if_undefined`?
504                if none_if_undefined:
505                    logger.info(
506                        f"resource not found at ${path}, but none_if_undefined is True, so returning None"
507                    )
508                    return None
509                logger.info(
510                    f"MockedAPI present, but resource not found at ${path}, falling back to real API"
511                )
512
513        """Get resource from Windmill"""
514        try:
515            return self.get(
516                f"/w/{self.workspace}/resources/get_value_interpolated/{path}"
517            ).json()
518        except Exception as e:
519            if none_if_undefined:
520                return None
521            logger.error(e)
522            raise e
def set_resource(self, value: Any, path: str, resource_type: str):
524    def set_resource(
525        self,
526        value: Any,
527        path: str,
528        resource_type: str,
529    ):
530        path = parse_resource_syntax(path) or path
531        if self.mocked_api is not None:
532            self.mocked_api["resources"][path] = value
533            return
534
535        # check if resource exists
536        r = self.get(
537            f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False
538        )
539        if r.status_code == 404:
540            # create resource
541            self.post(
542                f"/w/{self.workspace}/resources/create",
543                json={
544                    "path": path,
545                    "value": value,
546                    "resource_type": resource_type,
547                },
548            )
549        else:
550            # update resource
551            self.post(
552                f"/w/{self.workspace}/resources/update_value/{path}",
553                json={"value": value},
554            )
def list_resources( self, resource_type: str = None, page: int = None, per_page: int = None) -> list[dict]:
556    def list_resources(
557        self,
558        resource_type: str = None,
559        page: int = None,
560        per_page: int = None,
561    ) -> list[dict]:
562        """List resources from Windmill workspace.
563        
564        Args:
565            resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3")
566            page: Optional page number for pagination
567            per_page: Optional number of results per page
568            
569        Returns:
570            List of resource dictionaries
571        """
572        params = {}
573        if resource_type is not None:
574            params["resource_type"] = resource_type
575        if page is not None:
576            params["page"] = page
577        if per_page is not None:
578            params["per_page"] = per_page
579            
580        return self.get(
581            f"/w/{self.workspace}/resources/list",
582            params=params if params else None,
583        ).json()

List resources from Windmill workspace.

Args: resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") page: Optional page number for pagination per_page: Optional number of results per page

Returns: List of resource dictionaries

def set_state(self, value: Any):
585    def set_state(self, value: Any):
586        self.set_resource(value, path=self.state_path, resource_type="state")
def set_progress(self, value: int, job_id: Optional[str] = None):
588    def set_progress(self, value: int, job_id: Optional[str] = None):
589        workspace = get_workspace()
590        flow_id = os.environ.get("WM_FLOW_JOB_ID")
591        job_id = job_id or os.environ.get("WM_JOB_ID")
592
593        if job_id != None:
594            job = self.get_job(job_id)
595            flow_id = job.get("parent_job")
596
597        self.post(
598            f"/w/{workspace}/job_metrics/set_progress/{job_id}",
599            json={
600                "percent": value,
601                "flow_job_id": flow_id or None,
602            },
603        )
def get_progress(self, job_id: Optional[str] = None) -> Any:
605    def get_progress(self, job_id: Optional[str] = None) -> Any:
606        workspace = get_workspace()
607        job_id = job_id or os.environ.get("WM_JOB_ID")
608
609        r = self.get(
610            f"/w/{workspace}/job_metrics/get_progress/{job_id}",
611        )
612        if r.status_code == 404:
613            print(f"Job {job_id} does not exist")
614            return None
615        else:
616            return r.json()
def set_flow_user_state(self, key: str, value: Any) -> None:
618    def set_flow_user_state(self, key: str, value: Any) -> None:
619        """Set the user state of a flow at a given key"""
620        flow_id = self.get_root_job_id()
621        r = self.post(
622            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
623            json=value,
624            raise_for_status=False,
625        )
626        if r.status_code == 404:
627            print(f"Job {flow_id} does not exist or is not a flow")

Set the user state of a flow at a given key

def get_flow_user_state(self, key: str) -> Any:
629    def get_flow_user_state(self, key: str) -> Any:
630        """Get the user state of a flow at a given key"""
631        flow_id = self.get_root_job_id()
632        r = self.get(
633            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
634            raise_for_status=False,
635        )
636        if r.status_code == 404:
637            print(f"Job {flow_id} does not exist or is not a flow")
638            return None
639        else:
640            return r.json()

Get the user state of a flow at a given key

version
642    @property
643    def version(self):
644        return self.get("version").text
def get_duckdb_connection_settings( self, s3_resource_path: str = '') -> wmill.s3_types.DuckDbConnectionSettings | None:
646    def get_duckdb_connection_settings(
647        self,
648        s3_resource_path: str = "",
649    ) -> DuckDbConnectionSettings | None:
650        """
651        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
652        initiate an S3 connection from DuckDB
653        """
654        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
655        try:
656            raw_obj = self.post(
657                f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings",
658                json={}
659                if s3_resource_path == ""
660                else {"s3_resource_path": s3_resource_path},
661            ).json()
662            return DuckDbConnectionSettings(raw_obj)
663        except JSONDecodeError as e:
664            raise Exception(
665                "Could not generate DuckDB S3 connection settings from the provided resource"
666            ) from e

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from DuckDB

def get_polars_connection_settings( self, s3_resource_path: str = '') -> wmill.s3_types.PolarsConnectionSettings:
668    def get_polars_connection_settings(
669        self,
670        s3_resource_path: str = "",
671    ) -> PolarsConnectionSettings:
672        """
673        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
674        initiate an S3 connection from Polars
675        """
676        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
677        try:
678            raw_obj = self.post(
679                f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings",
680                json={}
681                if s3_resource_path == ""
682                else {"s3_resource_path": s3_resource_path},
683            ).json()
684            return PolarsConnectionSettings(raw_obj)
685        except JSONDecodeError as e:
686            raise Exception(
687                "Could not generate Polars S3 connection settings from the provided resource"
688            ) from e

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from Polars

def get_boto3_connection_settings( self, s3_resource_path: str = '') -> wmill.s3_types.Boto3ConnectionSettings:
690    def get_boto3_connection_settings(
691        self,
692        s3_resource_path: str = "",
693    ) -> Boto3ConnectionSettings:
694        """
695        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
696        initiate an S3 connection using boto3
697        """
698        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
699        try:
700            s3_resource = self.post(
701                f"/w/{self.workspace}/job_helpers/v2/s3_resource_info",
702                json={}
703                if s3_resource_path == ""
704                else {"s3_resource_path": s3_resource_path},
705            ).json()
706            return self.__boto3_connection_settings(s3_resource)
707        except JSONDecodeError as e:
708            raise Exception(
709                "Could not generate Boto3 S3 connection settings from the provided resource"
710            ) from e

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection using boto3

def load_s3_file( self, s3object: wmill.s3_types.S3Object | str, s3_resource_path: str | None) -> bytes:
712    def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes:
713        """
714        Load a file from the workspace s3 bucket and returns its content as bytes.
715
716        '''python
717        from wmill import S3Object
718
719        s3_obj = S3Object(s3="/path/to/my_file.txt")
720        my_obj_content = client.load_s3_file(s3_obj)
721        file_content = my_obj_content.decode("utf-8")
722        '''
723        """
724        s3object = parse_s3_object(s3object)
725        with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
726            return file_reader.read()

Load a file from the workspace s3 bucket and returns its content as bytes.

'''python from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt") my_obj_content = client.load_s3_file(s3_obj) file_content = my_obj_content.decode("utf-8") '''

def load_s3_file_reader( self, s3object: wmill.s3_types.S3Object | str, s3_resource_path: str | None) -> _io.BufferedReader:
728    def load_s3_file_reader(
729        self, s3object: S3Object | str, s3_resource_path: str | None
730    ) -> BufferedReader:
731        """
732        Load a file from the workspace s3 bucket and returns the bytes stream.
733
734        '''python
735        from wmill import S3Object
736
737        s3_obj = S3Object(s3="/path/to/my_file.txt")
738        with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
739            print(file_reader.read())
740        '''
741        """
742        s3object = parse_s3_object(s3object)
743        reader = S3BufferedReader(
744            f"{self.workspace}",
745            self.client,
746            s3object["s3"],
747            s3_resource_path,
748            s3object["storage"] if "storage" in s3object else None,
749        )
750        return reader

Load a file from the workspace s3 bucket and returns the bytes stream.

'''python from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt") with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: print(file_reader.read()) '''

def write_s3_file( self, s3object: wmill.s3_types.S3Object | str | None, file_content: _io.BufferedReader | bytes, s3_resource_path: str | None, content_type: str | None = None, content_disposition: str | None = None) -> wmill.s3_types.S3Object:
752    def write_s3_file(
753        self,
754        s3object: S3Object | str | None,
755        file_content: BufferedReader | bytes,
756        s3_resource_path: str | None,
757        content_type: str | None = None,
758        content_disposition: str | None = None,
759    ) -> S3Object:
760        """
761        Write a file to the workspace S3 bucket
762
763        '''python
764        from wmill import S3Object
765
766        s3_obj = S3Object(s3="/path/to/my_file.txt")
767
768        # for an in memory bytes array:
769        file_content = b'Hello Windmill!'
770        client.write_s3_file(s3_obj, file_content)
771
772        # for a file:
773        with open("my_file.txt", "rb") as my_file:
774            client.write_s3_file(s3_obj, my_file)
775        '''
776        """
777        s3object = parse_s3_object(s3object)
778        # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator
779        if isinstance(file_content, BufferedReader):
780            content_payload = bytes_generator(file_content)
781        elif isinstance(file_content, bytes):
782            content_payload = file_content
783        else:
784            raise Exception("Type of file_content not supported")
785
786        query_params = {}
787        if s3object is not None and s3object["s3"] != "":
788            query_params["file_key"] = s3object["s3"]
789        if s3_resource_path is not None and s3_resource_path != "":
790            query_params["s3_resource_path"] = s3_resource_path
791        if (
792            s3object is not None
793            and "storage" in s3object
794            and s3object["storage"] is not None
795        ):
796            query_params["storage"] = s3object["storage"]
797        if content_type is not None:
798            query_params["content_type"] = content_type
799        if content_disposition is not None:
800            query_params["content_disposition"] = content_disposition
801
802        try:
803            # need a vanilla client b/c content-type is not application/json here
804            response = httpx.post(
805                f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file",
806                headers={
807                    "Authorization": f"Bearer {self.token}",
808                    "Content-Type": "application/octet-stream",
809                },
810                params=query_params,
811                content=content_payload,
812                verify=self.verify,
813                timeout=None,
814            ).json()
815        except Exception as e:
816            raise Exception("Could not write file to S3") from e
817        return S3Object(s3=response["file_key"])

Write a file to the workspace S3 bucket

'''python from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt")

for an in memory bytes array:

file_content = b'Hello Windmill!' client.write_s3_file(s3_obj, file_content)

for a file:

with open("my_file.txt", "rb") as my_file: client.write_s3_file(s3_obj, my_file) '''

def sign_s3_objects( self, s3_objects: list[wmill.s3_types.S3Object | str]) -> list[wmill.s3_types.S3Object]:
819    def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]:
820        return self.post(
821            f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))}
822        ).json()
def sign_s3_object( self, s3_object: wmill.s3_types.S3Object | str) -> wmill.s3_types.S3Object:
824    def sign_s3_object(self, s3_object: S3Object | str) -> S3Object:
825        return self.post(
826            f"/w/{self.workspace}/apps/sign_s3_objects",
827            json={"s3_objects": [s3_object]},
828        ).json()[0]
def get_presigned_s3_public_urls( self, s3_objects: list[wmill.s3_types.S3Object | str], base_url: str | None = None) -> list[str]:
830    def get_presigned_s3_public_urls(
831        self,
832        s3_objects: list[S3Object | str],
833        base_url: str | None = None,
834    ) -> list[str]:
835        """
836        Generate presigned public URLs for an array of S3 objects.
837        If an S3 object is not signed yet, it will be signed first.
838
839        Args:
840            s3_objects: List of S3 objects to sign
841            base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
842
843        Returns:
844            List of signed public URLs
845
846        Example:
847            >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")]
848            >>> urls = client.get_presigned_s3_public_urls(s3_objs)
849        """
850        base_url = base_url or self._get_public_base_url()
851
852        s3_objs = [parse_s3_object(s3_obj) for s3_obj in s3_objects]
853
854        # Sign all S3 objects that need to be signed in one go
855        s3_objs_to_sign: list[tuple[S3Object, int]] = [
856            (s3_obj, index)
857            for index, s3_obj in enumerate(s3_objs)
858            if s3_obj.get("presigned") is None
859        ]
860
861        if s3_objs_to_sign:
862            print(f"Signing {len(s3_objs_to_sign)} S3 objects...")
863            signed_s3_objs = self.sign_s3_objects(
864                [s3_obj for s3_obj, _ in s3_objs_to_sign]
865            )
866            for i, (_, original_index) in enumerate(s3_objs_to_sign):
867                s3_objs[original_index] = parse_s3_object(signed_s3_objs[i])
868
869        signed_urls: list[str] = []
870        for s3_obj in s3_objs:
871            s3 = s3_obj.get("s3", "")
872            presigned = s3_obj.get("presigned", "")
873            storage = s3_obj.get("storage", "_default_")
874            signed_url = f"{base_url}/api/w/{self.workspace}/s3_proxy/{storage}/{s3}?{presigned}"
875            signed_urls.append(signed_url)
876
877        return signed_urls

Generate presigned public URLs for an array of S3 objects. If an S3 object is not signed yet, it will be signed first.

Args: s3_objects: List of S3 objects to sign base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)

Returns: List of signed public URLs

Example:

s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] urls = client.get_presigned_s3_public_urls(s3_objs)

def get_presigned_s3_public_url( self, s3_object: wmill.s3_types.S3Object | str, base_url: str | None = None) -> str:
879    def get_presigned_s3_public_url(
880        self,
881        s3_object: S3Object | str,
882        base_url: str | None = None,
883    ) -> str:
884        """
885        Generate a presigned public URL for an S3 object.
886        If the S3 object is not signed yet, it will be signed first.
887
888        Args:
889            s3_object: S3 object to sign
890            base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
891
892        Returns:
893            Signed public URL
894
895        Example:
896            >>> s3_obj = S3Object(s3="/path/to/file.txt")
897            >>> url = client.get_presigned_s3_public_url(s3_obj)
898        """
899        urls = self.get_presigned_s3_public_urls([s3_object], base_url)
900        return urls[0]

Generate a presigned public URL for an S3 object. If the S3 object is not signed yet, it will be signed first.

Args: s3_object: S3 object to sign base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)

Returns: Signed public URL

Example:

s3_obj = S3Object(s3="/path/to/file.txt") url = client.get_presigned_s3_public_url(s3_obj)

def whoami(self) -> dict:
921    def whoami(self) -> dict:
922        return self.get("/users/whoami").json()
user: dict
924    @property
925    def user(self) -> dict:
926        return self.whoami()
state_path: str
928    @property
929    def state_path(self) -> str:
930        state_path = os.environ.get(
931            "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH")
932        )
933        if state_path is None:
934            raise Exception("State path not found")
935        return state_path
state: Any
937    @property
938    def state(self) -> Any:
939        return self.get_resource(path=self.state_path, none_if_undefined=True)
@staticmethod
def set_shared_state_pickle(value: Any, path: str = 'state.pickle') -> None:
945    @staticmethod
946    def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None:
947        """
948        Set the state in the shared folder using pickle
949        """
950        import pickle
951
952        with open(f"/shared/{path}", "wb") as handle:
953            pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL)

Set the state in the shared folder using pickle

@staticmethod
def get_shared_state_pickle(path: str = 'state.pickle') -> Any:
955    @staticmethod
956    def get_shared_state_pickle(path: str = "state.pickle") -> Any:
957        """
958        Get the state in the shared folder using pickle
959        """
960        import pickle
961
962        with open(f"/shared/{path}", "rb") as handle:
963            return pickle.load(handle)

Get the state in the shared folder using pickle

@staticmethod
def set_shared_state(value: Any, path: str = 'state.json') -> None:
965    @staticmethod
966    def set_shared_state(value: Any, path: str = "state.json") -> None:
967        """
968        Set the state in the shared folder using pickle
969        """
970        import json
971
972        with open(f"/shared/{path}", "w", encoding="utf-8") as f:
973            json.dump(value, f, ensure_ascii=False, indent=4)

Set the state in the shared folder using pickle

@staticmethod
def get_shared_state(path: str = 'state.json') -> None:
975    @staticmethod
976    def get_shared_state(path: str = "state.json") -> None:
977        """
978        Get the state in the shared folder using pickle
979        """
980        import json
981
982        with open(f"/shared/{path}", "r", encoding="utf-8") as f:
983            return json.load(f)

Get the state in the shared folder using pickle

def get_resume_urls(self, approver: str = None) -> dict:
985    def get_resume_urls(self, approver: str = None) -> dict:
986        nonce = random.randint(0, 1000000000)
987        job_id = os.environ.get("WM_JOB_ID") or "NO_ID"
988        return self.get(
989            f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}",
990            params={"approver": approver},
991        ).json()
def request_interactive_slack_approval( self, slack_resource_path: str, channel_id: str, message: str = None, approver: str = None, default_args_json: dict = None, dynamic_enums_json: dict = None) -> None:
 993    def request_interactive_slack_approval(
 994        self,
 995        slack_resource_path: str,
 996        channel_id: str,
 997        message: str = None,
 998        approver: str = None,
 999        default_args_json: dict = None,
1000        dynamic_enums_json: dict = None,
1001    ) -> None:
1002        """
1003        Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
1004
1005        **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.
1006        Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
1007
1008        :param slack_resource_path: The path to the Slack resource in Windmill.
1009        :type slack_resource_path: str
1010        :param channel_id: The Slack channel ID where the approval request will be sent.
1011        :type channel_id: str
1012        :param message: Optional custom message to include in the Slack approval request.
1013        :type message: str, optional
1014        :param approver: Optional user ID or name of the approver for the request.
1015        :type approver: str, optional
1016        :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
1017        :type default_args_json: dict, optional
1018        :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
1019        :type dynamic_enums_json: dict, optional
1020
1021        :raises Exception: If the function is not called within a flow or flow preview.
1022        :raises Exception: If the required flow job or flow step environment variables are not set.
1023
1024        :return: None
1025
1026        **Usage Example:**
1027            >>> client.request_interactive_slack_approval(
1028            ...     slack_resource_path="/u/alex/my_slack_resource",
1029            ...     channel_id="admins-slack-channel",
1030            ...     message="Please approve this request",
1031            ...     approver="approver123",
1032            ...     default_args_json={"key1": "value1", "key2": 42},
1033            ...     dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},
1034            ... )
1035
1036        **Notes:**
1037        - This function must be executed within a Windmill flow or flow preview.
1038        - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context.
1039        """
1040        workspace = self.workspace
1041        flow_job_id = os.environ.get("WM_FLOW_JOB_ID")
1042
1043        if not flow_job_id:
1044            raise Exception(
1045                "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview."
1046            )
1047
1048        # Only include non-empty parameters
1049        params = {}
1050        if message:
1051            params["message"] = message
1052        if approver:
1053            params["approver"] = approver
1054        if slack_resource_path:
1055            params["slack_resource_path"] = slack_resource_path
1056        if channel_id:
1057            params["channel_id"] = channel_id
1058        if os.environ.get("WM_FLOW_STEP_ID"):
1059            params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID")
1060        if default_args_json:
1061            params["default_args_json"] = json.dumps(default_args_json)
1062        if dynamic_enums_json:
1063            params["dynamic_enums_json"] = json.dumps(dynamic_enums_json)
1064
1065        self.get(
1066            f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}",
1067            params=params,
1068        )

Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.

[Enterprise Edition Only] To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form

Parameters
  • slack_resource_path: The path to the Slack resource in Windmill.
  • channel_id: The Slack channel ID where the approval request will be sent.
  • message: Optional custom message to include in the Slack approval request.
  • approver: Optional user ID or name of the approver for the request.
  • default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
  • dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
Raises
  • Exception: If the function is not called within a flow or flow preview.
  • Exception: If the required flow job or flow step environment variables are not set.
Returns

None

Usage Example:

client.request_interactive_slack_approval( ... slack_resource_path="/u/alex/my_slack_resource", ... channel_id="admins-slack-channel", ... message="Please approve this request", ... approver="approver123", ... default_args_json={"key1": "value1", "key2": 42}, ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, ... )

Notes:

  • This function must be executed within a Windmill flow or flow preview.
  • The function checks for required environment variables (WM_FLOW_JOB_ID, WM_FLOW_STEP_ID) to ensure it is run in the appropriate context.
def username_to_email(self, username: str) -> str:
1070    def username_to_email(self, username: str) -> str:
1071        """
1072        Get email from workspace username
1073        This method is particularly useful for apps that require the email address of the viewer.
1074        Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1075        """
1076        return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text

Get email from workspace username This method is particularly useful for apps that require the email address of the viewer. Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.

def send_teams_message( self, conversation_id: str, text: str, success: bool = True, card_block: dict = None):
1078    def send_teams_message(
1079        self,
1080        conversation_id: str,
1081        text: str,
1082        success: bool = True,
1083        card_block: dict = None,
1084    ):
1085        """
1086        Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
1087        """
1088        return self.post(
1089            f"/teams/activities",
1090            json={
1091                "conversation_id": conversation_id,
1092                "text": text,
1093                "success": success,
1094                "card_block": card_block,
1095            },
1096        )

Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message

def datatable(self, name: str = 'main'):
1098    def datatable(self, name: str = "main"):
1099        return DataTableClient(self, name)
def ducklake(self, name: str = 'main'):
1101    def ducklake(self, name: str = "main"):
1102        return DucklakeClient(self, name)
def init_global_client(f):
1106def init_global_client(f):
1107    @functools.wraps(f)
1108    def wrapper(*args, **kwargs):
1109        global _client
1110        if _client is None:
1111            _client = Windmill()
1112        return f(*args, **kwargs)
1113
1114    return wrapper
def deprecate(in_favor_of: str):
1117def deprecate(in_favor_of: str):
1118    def decorator(f):
1119        @functools.wraps(f)
1120        def wrapper(*args, **kwargs):
1121            warnings.warn(
1122                (
1123                    f"The '{f.__name__}' method is deprecated and may be removed in the future. "
1124                    f"Consider {in_favor_of}"
1125                ),
1126                DeprecationWarning,
1127            )
1128            return f(*args, **kwargs)
1129
1130        return wrapper
1131
1132    return decorator
@init_global_client
def get_workspace() -> str:
1135@init_global_client
1136def get_workspace() -> str:
1137    return _client.workspace
@init_global_client
def get_root_job_id(job_id: str | None = None) -> str:
1140@init_global_client
1141def get_root_job_id(job_id: str | None = None) -> str:
1142    return _client.get_root_job_id(job_id)
@init_global_client
@deprecate('Windmill().version')
def get_version() -> str:
1145@init_global_client
1146@deprecate("Windmill().version")
1147def get_version() -> str:
1148    return _client.version
@init_global_client
def run_script_async( hash_or_path: str, args: Dict[str, Any] = None, scheduled_in_secs: int = None) -> str:
1151@init_global_client
1152def run_script_async(
1153    hash_or_path: str,
1154    args: Dict[str, Any] = None,
1155    scheduled_in_secs: int = None,
1156) -> str:
1157    is_path = "/" in hash_or_path
1158    hash_ = None if is_path else hash_or_path
1159    path = hash_or_path if is_path else None
1160    return _client.run_script_async(
1161        hash_=hash_,
1162        path=path,
1163        args=args,
1164        scheduled_in_secs=scheduled_in_secs,
1165    )
@init_global_client
def run_flow_async( path: str, args: Dict[str, Any] = None, scheduled_in_secs: int = None, do_not_track_in_parent: bool = True) -> str:
1168@init_global_client
1169def run_flow_async(
1170    path: str,
1171    args: Dict[str, Any] = None,
1172    scheduled_in_secs: int = None,
1173    # can only be set to false if this the job will be fully await and not concurrent with any other job
1174    # as otherwise the child flow and its own child will store their state in the parent job which will
1175    # lead to incorrectness and failures
1176    do_not_track_in_parent: bool = True,
1177) -> str:
1178    return _client.run_flow_async(
1179        path=path,
1180        args=args,
1181        scheduled_in_secs=scheduled_in_secs,
1182        do_not_track_in_parent=do_not_track_in_parent,
1183    )
@init_global_client
def run_script_sync( hash: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: datetime.timedelta = None) -> Any:
1186@init_global_client
1187def run_script_sync(
1188    hash: str,
1189    args: Dict[str, Any] = None,
1190    verbose: bool = False,
1191    assert_result_is_not_none: bool = True,
1192    cleanup: bool = True,
1193    timeout: dt.timedelta = None,
1194) -> Any:
1195    return _client.run_script(
1196        hash_=hash,
1197        args=args,
1198        verbose=verbose,
1199        assert_result_is_not_none=assert_result_is_not_none,
1200        cleanup=cleanup,
1201        timeout=timeout,
1202    )
@init_global_client
def run_script_by_path_async( path: str, args: Dict[str, Any] = None, scheduled_in_secs: Optional[int] = None) -> str:
1205@init_global_client
1206def run_script_by_path_async(
1207    path: str,
1208    args: Dict[str, Any] = None,
1209    scheduled_in_secs: Union[None, int] = None,
1210) -> str:
1211    return _client.run_script_by_path_async(
1212        path=path,
1213        args=args,
1214        scheduled_in_secs=scheduled_in_secs,
1215    )
@init_global_client
def run_script_by_hash_async( hash_: str, args: Dict[str, Any] = None, scheduled_in_secs: Optional[int] = None) -> str:
1218@init_global_client
1219def run_script_by_hash_async(
1220    hash_: str,
1221    args: Dict[str, Any] = None,
1222    scheduled_in_secs: Union[None, int] = None,
1223) -> str:
1224    return _client.run_script_by_hash_async(
1225        hash_=hash_,
1226        args=args,
1227        scheduled_in_secs=scheduled_in_secs,
1228    )
@init_global_client
def run_script_by_path_sync( path: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: datetime.timedelta = None) -> Any:
1231@init_global_client
1232def run_script_by_path_sync(
1233    path: str,
1234    args: Dict[str, Any] = None,
1235    verbose: bool = False,
1236    assert_result_is_not_none: bool = True,
1237    cleanup: bool = True,
1238    timeout: dt.timedelta = None,
1239) -> Any:
1240    return _client.run_script(
1241        path=path,
1242        args=args,
1243        verbose=verbose,
1244        assert_result_is_not_none=assert_result_is_not_none,
1245        cleanup=cleanup,
1246        timeout=timeout,
1247    )
@init_global_client
def get_id_token(audience: str) -> str:
1250@init_global_client
1251def get_id_token(audience: str) -> str:
1252    """
1253    Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc.
1254    """
1255    return _client.get_id_token(audience)

Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc.

@init_global_client
def get_job_status(job_id: str) -> Literal['RUNNING', 'WAITING', 'COMPLETED']:
1258@init_global_client
1259def get_job_status(job_id: str) -> JobStatus:
1260    return _client.get_job_status(job_id)
@init_global_client
def get_result(job_id: str, assert_result_is_not_none=True) -> Dict[str, Any]:
1263@init_global_client
1264def get_result(job_id: str, assert_result_is_not_none=True) -> Dict[str, Any]:
1265    return _client.get_result(
1266        job_id=job_id, assert_result_is_not_none=assert_result_is_not_none
1267    )
@init_global_client
def duckdb_connection_settings(s3_resource_path: str = '') -> wmill.s3_types.DuckDbConnectionSettings:
1270@init_global_client
1271def duckdb_connection_settings(s3_resource_path: str = "") -> DuckDbConnectionSettings:
1272    """
1273    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1274    initiate an S3 connection from DuckDB
1275    """
1276    return _client.get_duckdb_connection_settings(s3_resource_path)

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from DuckDB

@init_global_client
def polars_connection_settings(s3_resource_path: str = '') -> wmill.s3_types.PolarsConnectionSettings:
1279@init_global_client
1280def polars_connection_settings(s3_resource_path: str = "") -> PolarsConnectionSettings:
1281    """
1282    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1283    initiate an S3 connection from Polars
1284    """
1285    return _client.get_polars_connection_settings(s3_resource_path)

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from Polars

@init_global_client
def boto3_connection_settings(s3_resource_path: str = '') -> wmill.s3_types.Boto3ConnectionSettings:
1288@init_global_client
1289def boto3_connection_settings(s3_resource_path: str = "") -> Boto3ConnectionSettings:
1290    """
1291    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1292    initiate an S3 connection using boto3
1293    """
1294    return _client.get_boto3_connection_settings(s3_resource_path)

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection using boto3

@init_global_client
def load_s3_file( s3object: wmill.s3_types.S3Object | str, s3_resource_path: str | None = None) -> bytes:
1297@init_global_client
1298def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None = None) -> bytes:
1299    """
1300    Load the entire content of a file stored in S3 as bytes
1301    """
1302    return _client.load_s3_file(
1303        s3object, s3_resource_path if s3_resource_path != "" else None
1304    )

Load the entire content of a file stored in S3 as bytes

@init_global_client
def load_s3_file_reader( s3object: wmill.s3_types.S3Object | str, s3_resource_path: str | None = None) -> _io.BufferedReader:
1307@init_global_client
1308def load_s3_file_reader(
1309    s3object: S3Object | str, s3_resource_path: str | None = None
1310) -> BufferedReader:
1311    """
1312    Load the content of a file stored in S3
1313    """
1314    return _client.load_s3_file_reader(
1315        s3object, s3_resource_path if s3_resource_path != "" else None
1316    )

Load the content of a file stored in S3

@init_global_client
def write_s3_file( s3object: wmill.s3_types.S3Object | str | None, file_content: _io.BufferedReader | bytes, s3_resource_path: str | None = None, content_type: str | None = None, content_disposition: str | None = None) -> wmill.s3_types.S3Object:
1319@init_global_client
1320def write_s3_file(
1321    s3object: S3Object | str | None,
1322    file_content: BufferedReader | bytes,
1323    s3_resource_path: str | None = None,
1324    content_type: str | None = None,
1325    content_disposition: str | None = None,
1326) -> S3Object:
1327    """
1328    Upload a file to S3
1329
1330    Content type will be automatically guessed from path extension if left empty
1331
1332    See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition
1333    and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
1334
1335    """
1336    return _client.write_s3_file(
1337        s3object,
1338        file_content,
1339        s3_resource_path if s3_resource_path != "" else None,
1340        content_type,
1341        content_disposition,
1342    )

Upload a file to S3

Content type will be automatically guessed from path extension if left empty

See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type

@init_global_client
def sign_s3_objects( s3_objects: list[wmill.s3_types.S3Object | str]) -> list[wmill.s3_types.S3Object]:
1345@init_global_client
1346def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]:
1347    """
1348    Sign S3 objects to be used by anonymous users in public apps
1349    Returns a list of signed s3 tokens
1350    """
1351    return _client.sign_s3_objects(s3_objects)

Sign S3 objects to be used by anonymous users in public apps Returns a list of signed s3 tokens

@init_global_client
def sign_s3_object(s3_object: wmill.s3_types.S3Object | str) -> wmill.s3_types.S3Object:
1354@init_global_client
1355def sign_s3_object(s3_object: S3Object| str) -> S3Object:
1356    """
1357    Sign S3 object to be used by anonymous users in public apps
1358    Returns a signed s3 object
1359    """
1360    return _client.sign_s3_object(s3_object)

Sign S3 object to be used by anonymous users in public apps Returns a signed s3 object

@init_global_client
def get_presigned_s3_public_urls( s3_objects: list[wmill.s3_types.S3Object | str], base_url: str | None = None) -> list[str]:
1363@init_global_client
1364def get_presigned_s3_public_urls(
1365    s3_objects: list[S3Object | str],
1366    base_url: str | None = None,
1367) -> list[str]:
1368    """
1369    Generate presigned public URLs for an array of S3 objects.
1370    If an S3 object is not signed yet, it will be signed first.
1371
1372    Args:
1373        s3_objects: List of S3 objects to sign
1374        base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
1375
1376    Returns:
1377        List of signed public URLs
1378
1379    Example:
1380        >>> import wmill
1381        >>> from wmill import S3Object
1382        >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")]
1383        >>> urls = wmill.get_presigned_s3_public_urls(s3_objs)
1384    """
1385    return _client.get_presigned_s3_public_urls(s3_objects, base_url)

Generate presigned public URLs for an array of S3 objects. If an S3 object is not signed yet, it will be signed first.

Args: s3_objects: List of S3 objects to sign base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)

Returns: List of signed public URLs

Example:

import wmill from wmill import S3Object s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")] urls = wmill.get_presigned_s3_public_urls(s3_objs)

@init_global_client
def get_presigned_s3_public_url( s3_object: wmill.s3_types.S3Object | str, base_url: str | None = None) -> str:
1388@init_global_client
1389def get_presigned_s3_public_url(
1390    s3_object: S3Object | str,
1391    base_url: str | None = None,
1392) -> str:
1393    """
1394    Generate a presigned public URL for an S3 object.
1395    If the S3 object is not signed yet, it will be signed first.
1396
1397    Args:
1398        s3_object: S3 object to sign
1399        base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
1400
1401    Returns:
1402        Signed public URL
1403
1404    Example:
1405        >>> import wmill
1406        >>> from wmill import S3Object
1407        >>> s3_obj = S3Object(s3="/path/to/file.txt")
1408        >>> url = wmill.get_presigned_s3_public_url(s3_obj)
1409    """
1410    return _client.get_presigned_s3_public_url(s3_object, base_url)

Generate a presigned public URL for an S3 object. If the S3 object is not signed yet, it will be signed first.

Args: s3_object: S3 object to sign base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)

Returns: Signed public URL

Example:

import wmill from wmill import S3Object s3_obj = S3Object(s3="/path/to/file.txt") url = wmill.get_presigned_s3_public_url(s3_obj)

@init_global_client
def whoami() -> dict:
1413@init_global_client
1414def whoami() -> dict:
1415    """
1416    Returns the current user
1417    """
1418    return _client.user

Returns the current user

@init_global_client
@deprecate('Windmill().state')
def get_state() -> Any:
1421@init_global_client
1422@deprecate("Windmill().state")
1423def get_state() -> Any:
1424    """
1425    Get the state
1426    """
1427    return _client.state

Get the state

@init_global_client
def get_resource(path: str, none_if_undefined: bool = False) -> dict | None:
1430@init_global_client
1431def get_resource(
1432    path: str,
1433    none_if_undefined: bool = False,
1434) -> dict | None:
1435    """Get resource from Windmill"""
1436    return _client.get_resource(path, none_if_undefined)

Get resource from Windmill

@init_global_client
def set_resource(path: str, value: Any, resource_type: str = 'any') -> None:
1439@init_global_client
1440def set_resource(path: str, value: Any, resource_type: str = "any") -> None:
1441    """
1442    Set the resource at a given path as a string, creating it if it does not exist
1443    """
1444    return _client.set_resource(value=value, path=path, resource_type=resource_type)

Set the resource at a given path as a string, creating it if it does not exist

@init_global_client
def list_resources( resource_type: str = None, page: int = None, per_page: int = None) -> list[dict]:
1447@init_global_client
1448def list_resources(
1449    resource_type: str = None,
1450    page: int = None,
1451    per_page: int = None,
1452) -> list[dict]:
1453    """List resources from Windmill workspace.
1454    
1455    Args:
1456        resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3")
1457        page: Optional page number for pagination
1458        per_page: Optional number of results per page
1459        
1460    Returns:
1461        List of resource dictionaries
1462        
1463    Example:
1464        >>> # Get all resources
1465        >>> all_resources = wmill.list_resources()
1466        
1467        >>> # Get only PostgreSQL resources
1468        >>> pg_resources = wmill.list_resources(resource_type="postgresql")
1469    """
1470    return _client.list_resources(
1471        resource_type=resource_type,
1472        page=page,
1473        per_page=per_page,
1474    )

List resources from Windmill workspace.

Args: resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3") page: Optional page number for pagination per_page: Optional number of results per page

Returns: List of resource dictionaries

Example:

Get all resources

all_resources = wmill.list_resources()

>>> # Get only PostgreSQL resources
>>> pg_resources = wmill.list_resources(resource_type="postgresql")
@init_global_client
def set_state(value: Any) -> None:
1477@init_global_client
1478def set_state(value: Any) -> None:
1479    """
1480    Set the state
1481    """
1482    return _client.set_state(value)

Set the state

@init_global_client
def set_progress(value: int, job_id: Optional[str] = None) -> None:
1485@init_global_client
1486def set_progress(value: int, job_id: Optional[str] = None) -> None:
1487    """
1488    Set the progress
1489    """
1490    return _client.set_progress(value, job_id)

Set the progress

@init_global_client
def get_progress(job_id: Optional[str] = None) -> Any:
1493@init_global_client
1494def get_progress(job_id: Optional[str] = None) -> Any:
1495    """
1496    Get the progress
1497    """
1498    return _client.get_progress(job_id)

Get the progress

def set_shared_state_pickle(value: Any, path='state.pickle') -> None:
1501def set_shared_state_pickle(value: Any, path="state.pickle") -> None:
1502    """
1503    Set the state in the shared folder using pickle
1504    """
1505    return Windmill.set_shared_state_pickle(value=value, path=path)

Set the state in the shared folder using pickle

@deprecate('Windmill.get_shared_state_pickle(...)')
def get_shared_state_pickle(path='state.pickle') -> Any:
1508@deprecate("Windmill.get_shared_state_pickle(...)")
1509def get_shared_state_pickle(path="state.pickle") -> Any:
1510    """
1511    Get the state in the shared folder using pickle
1512    """
1513    return Windmill.get_shared_state_pickle(path=path)

Get the state in the shared folder using pickle

def set_shared_state(value: Any, path='state.json') -> None:
1516def set_shared_state(value: Any, path="state.json") -> None:
1517    """
1518    Set the state in the shared folder using pickle
1519    """
1520    return Windmill.set_shared_state(value=value, path=path)

Set the state in the shared folder using pickle

def get_shared_state(path='state.json') -> None:
1523def get_shared_state(path="state.json") -> None:
1524    """
1525    Get the state in the shared folder using pickle
1526    """
1527    return Windmill.get_shared_state(path=path)

Get the state in the shared folder using pickle

@init_global_client
def get_variable(path: str) -> str:
1530@init_global_client
1531def get_variable(path: str) -> str:
1532    """
1533    Returns the variable at a given path as a string
1534    """
1535    return _client.get_variable(path)

Returns the variable at a given path as a string

@init_global_client
def set_variable(path: str, value: str, is_secret: bool = False) -> None:
1538@init_global_client
1539def set_variable(path: str, value: str, is_secret: bool = False) -> None:
1540    """
1541    Set the variable at a given path as a string, creating it if it does not exist
1542    """
1543    return _client.set_variable(path, value, is_secret)

Set the variable at a given path as a string, creating it if it does not exist

@init_global_client
def get_flow_user_state(key: str) -> Any:
1546@init_global_client
1547def get_flow_user_state(key: str) -> Any:
1548    """
1549    Get the user state of a flow at a given key
1550    """
1551    return _client.get_flow_user_state(key)

Get the user state of a flow at a given key

@init_global_client
def set_flow_user_state(key: str, value: Any) -> None:
1554@init_global_client
1555def set_flow_user_state(key: str, value: Any) -> None:
1556    """
1557    Set the user state of a flow at a given key
1558    """
1559    return _client.set_flow_user_state(key, value)

Set the user state of a flow at a given key

@init_global_client
def get_state_path() -> str:
1562@init_global_client
1563def get_state_path() -> str:
1564    return _client.state_path
@init_global_client
def get_resume_urls(approver: str = None) -> dict:
1567@init_global_client
1568def get_resume_urls(approver: str = None) -> dict:
1569    return _client.get_resume_urls(approver)
@init_global_client
def request_interactive_slack_approval( slack_resource_path: str, channel_id: str, message: str = None, approver: str = None, default_args_json: dict = None, dynamic_enums_json: dict = None) -> None:
1572@init_global_client
1573def request_interactive_slack_approval(
1574    slack_resource_path: str,
1575    channel_id: str,
1576    message: str = None,
1577    approver: str = None,
1578    default_args_json: dict = None,
1579    dynamic_enums_json: dict = None,
1580) -> None:
1581    return _client.request_interactive_slack_approval(
1582        slack_resource_path=slack_resource_path,
1583        channel_id=channel_id,
1584        message=message,
1585        approver=approver,
1586        default_args_json=default_args_json,
1587        dynamic_enums_json=dynamic_enums_json,
1588    )
@init_global_client
def send_teams_message( conversation_id: str, text: str, success: bool, card_block: dict = None):
1591@init_global_client
1592def send_teams_message(
1593    conversation_id: str, text: str, success: bool, card_block: dict = None
1594):
1595    return _client.send_teams_message(conversation_id, text, success, card_block)
@init_global_client
def cancel_job(job_id: str, reason: str = None) -> str:
1598@init_global_client
1599def cancel_job(job_id: str, reason: str = None) -> str:
1600    """Cancel a specific job by ID.
1601
1602    Args:
1603        job_id: UUID of the job to cancel
1604        reason: Optional reason for cancellation
1605
1606    Returns:
1607        Response message from the cancel endpoint
1608    """
1609    return _client.cancel_job(job_id, reason)

Cancel a specific job by ID.

Args: job_id: UUID of the job to cancel reason: Optional reason for cancellation

Returns: Response message from the cancel endpoint

@init_global_client
def cancel_running() -> dict:
1612@init_global_client
1613def cancel_running() -> dict:
1614    """Cancel currently running executions of the same script."""
1615    return _client.cancel_running()

Cancel currently running executions of the same script.

@init_global_client
def run_script( path: str = None, hash_: str = None, args: dict = None, timeout: datetime.timedelta | int | float = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = True) -> Any:
1618@init_global_client
1619def run_script(
1620    path: str = None,
1621    hash_: str = None,
1622    args: dict = None,
1623    timeout: dt.timedelta | int | float = None,
1624    verbose: bool = False,
1625    cleanup: bool = True,
1626    assert_result_is_not_none: bool = True,
1627) -> Any:
1628    """Run script synchronously and return its result.
1629    
1630    .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
1631    """
1632    return _client.run_script(
1633        path=path,
1634        hash_=hash_,
1635        args=args,
1636        verbose=verbose,
1637        assert_result_is_not_none=assert_result_is_not_none,
1638        cleanup=cleanup,
1639        timeout=timeout,
1640    )

Run script synchronously and return its result.

Deprecated since version Use run_script_by_path or run_script_by_hash instead..

@init_global_client
def run_script_by_path( path: str, args: dict = None, timeout: datetime.timedelta | int | float = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = True) -> Any:
1643@init_global_client
1644def run_script_by_path(
1645    path: str,
1646    args: dict = None,
1647    timeout: dt.timedelta | int | float = None,
1648    verbose: bool = False,
1649    cleanup: bool = True,
1650    assert_result_is_not_none: bool = True,
1651) -> Any:
1652    """Run script by path synchronously and return its result."""
1653    return _client.run_script_by_path(
1654        path=path,
1655        args=args,
1656        verbose=verbose,
1657        assert_result_is_not_none=assert_result_is_not_none,
1658        cleanup=cleanup,
1659        timeout=timeout,
1660    )

Run script by path synchronously and return its result.

@init_global_client
def run_script_by_hash( hash_: str, args: dict = None, timeout: datetime.timedelta | int | float = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = True) -> Any:
1663@init_global_client
1664def run_script_by_hash(
1665    hash_: str,
1666    args: dict = None,
1667    timeout: dt.timedelta | int | float = None,
1668    verbose: bool = False,
1669    cleanup: bool = True,
1670    assert_result_is_not_none: bool = True,
1671) -> Any:
1672    """Run script by hash synchronously and return its result."""
1673    return _client.run_script_by_hash(
1674        hash_=hash_,
1675        args=args,
1676        verbose=verbose,
1677        assert_result_is_not_none=assert_result_is_not_none,
1678        cleanup=cleanup,
1679        timeout=timeout,
1680    )

Run script by hash synchronously and return its result.

@init_global_client
def run_inline_script_preview(content: str, language: str, args: dict = None) -> Any:
1682@init_global_client
1683def run_inline_script_preview(
1684    content: str,
1685    language: str,
1686    args: dict = None,
1687) -> Any:
1688    """Run a script on the current worker without creating a job"""
1689    return _client.run_inline_script_preview(
1690        content=content,
1691        language=language,
1692        args=args,
1693    )

Run a script on the current worker without creating a job

@init_global_client
def username_to_email(username: str) -> str:
1695@init_global_client
1696def username_to_email(username: str) -> str:
1697    """
1698    Get email from workspace username
1699    This method is particularly useful for apps that require the email address of the viewer.
1700    Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1701    """
1702    return _client.username_to_email(username)

Get email from workspace username This method is particularly useful for apps that require the email address of the viewer. Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.

@init_global_client
def datatable(name: str = 'main') -> DataTableClient:
1705@init_global_client
1706def datatable(name: str = "main") -> DataTableClient:
1707    return _client.datatable(name)
@init_global_client
def ducklake(name: str = 'main') -> DucklakeClient:
1709@init_global_client
1710def ducklake(name: str = "main") -> DucklakeClient:
1711    return _client.ducklake(name)
def task(*args, **kwargs):
1713def task(*args, **kwargs):
1714    from inspect import signature
1715
1716    def f(func, tag: str | None = None):
1717        if (
1718            os.environ.get("WM_JOB_ID") is None
1719            or os.environ.get("MAIN_OVERRIDE") == func.__name__
1720        ):
1721
1722            def inner(*args, **kwargs):
1723                return func(*args, **kwargs)
1724
1725            return inner
1726        else:
1727
1728            def inner(*args, **kwargs):
1729                global _client
1730                if _client is None:
1731                    _client = Windmill()
1732                w_id = os.environ.get("WM_WORKSPACE")
1733                job_id = os.environ.get("WM_JOB_ID")
1734                f_name = func.__name__
1735                json = kwargs
1736                params = list(signature(func).parameters)
1737                for i, arg in enumerate(args):
1738                    if i < len(params):
1739                        p = params[i]
1740                        key = p
1741                        if key not in kwargs:
1742                            json[key] = arg
1743
1744                params = {}
1745                if tag is not None:
1746                    params["tag"] = tag
1747                w_as_code_response = _client.post(
1748                    f"/w/{w_id}/jobs/run/workflow_as_code/{job_id}/{f_name}",
1749                    json={"args": json},
1750                    params=params,
1751                )
1752                job_id = w_as_code_response.text
1753                print(f"Executing task {func.__name__} on job {job_id}")
1754                job_result = _client.wait_job(job_id)
1755                print(f"Task {func.__name__} ({job_id}) completed")
1756                return job_result
1757
1758            return inner
1759
1760    if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
1761        return f(args[0], None)
1762    else:
1763        return lambda x: f(x, kwargs.get("tag"))
def parse_resource_syntax(s: str) -> Optional[str]:
1765def parse_resource_syntax(s: str) -> Optional[str]:
1766    """Parse resource syntax from string."""
1767    if s is None:
1768        return None
1769    if s.startswith("$res:"):
1770        return s[5:]
1771    if s.startswith("res://"):
1772        return s[6:]
1773    return None

Parse resource syntax from string.

def parse_s3_object(s3_object: wmill.s3_types.S3Object | str) -> wmill.s3_types.S3Object:
1775def parse_s3_object(s3_object: S3Object | str) -> S3Object:
1776    """Parse S3 object from string or S3Object format."""
1777    if isinstance(s3_object, str):
1778        match = re.match(r'^s3://([^/]*)/(.*)$', s3_object)
1779        if match:
1780            return S3Object(s3=match.group(2) or "", storage=match.group(1) or None)
1781        return S3Object(s3="")
1782    else:
1783        return s3_object

Parse S3 object from string or S3Object format.

def parse_variable_syntax(s: str) -> Optional[str]:
1787def parse_variable_syntax(s: str) -> Optional[str]:
1788    """Parse variable syntax from string."""
1789    if s.startswith("var://"):
1790        return s[6:]
1791    return None

Parse variable syntax from string.

def append_to_result_stream(text: str) -> None:
1794def append_to_result_stream(text: str) -> None:
1795    """Append a text to the result stream.
1796    
1797    Args:
1798        text: text to append to the result stream
1799    """
1800    print("WM_STREAM: {}".format(text.replace(chr(10), '\\n')))

Append a text to the result stream.

Args: text: text to append to the result stream

def stream_result(stream) -> None:
1802def stream_result(stream) -> None:
1803    """Stream to the result stream.
1804    
1805    Args:
1806        stream: stream to stream to the result stream
1807    """
1808    for text in stream:
1809        append_to_result_stream(text)

Stream to the result stream.

Args: stream: stream to stream to the result stream

class DataTableClient:
1811class DataTableClient:
1812    def __init__(self, client: Windmill, name: str):
1813        self.client = client
1814        self.name = name
1815    def query(self, sql: str, *args):
1816        args_dict = {}
1817        args_def = ""
1818        for i, arg in enumerate(args):
1819            args_dict[f"arg{i+1}"] = arg
1820            args_def += f"-- ${i+1} arg{i+1}\n"
1821        sql = args_def + sql
1822        return SqlQuery(
1823            sql, 
1824            lambda sql: self.client.run_inline_script_preview(
1825                content=sql,
1826                language="postgresql",
1827                args={"database": f"datatable://{self.name}", **args_dict},
1828            )
1829        )
DataTableClient(client: Windmill, name: str)
1812    def __init__(self, client: Windmill, name: str):
1813        self.client = client
1814        self.name = name
client
name
def query(self, sql: str, *args):
1815    def query(self, sql: str, *args):
1816        args_dict = {}
1817        args_def = ""
1818        for i, arg in enumerate(args):
1819            args_dict[f"arg{i+1}"] = arg
1820            args_def += f"-- ${i+1} arg{i+1}\n"
1821        sql = args_def + sql
1822        return SqlQuery(
1823            sql, 
1824            lambda sql: self.client.run_inline_script_preview(
1825                content=sql,
1826                language="postgresql",
1827                args={"database": f"datatable://{self.name}", **args_dict},
1828            )
1829        )
class DucklakeClient:
1831class DucklakeClient:
1832    def __init__(self, client: Windmill, name: str):
1833        self.client = client
1834        self.name = name
1835    def query(self, sql: str, **kwargs):
1836        args_dict = {}
1837        args_def = ""
1838        for key, value in kwargs.items():
1839            args_dict[key] = value
1840            args_def += f"-- ${key} ({infer_sql_type(value)})\n"
1841        attach = f"ATTACH 'ducklake://{self.name}' AS dl;USE dl;\n"
1842        sql = args_def + attach + sql
1843        return SqlQuery(
1844            sql, 
1845            lambda sql: self.client.run_inline_script_preview(
1846                content=sql,
1847                language="duckdb",
1848                args=args_dict,
1849            )
1850        )
DucklakeClient(client: Windmill, name: str)
1832    def __init__(self, client: Windmill, name: str):
1833        self.client = client
1834        self.name = name
client
name
def query(self, sql: str, **kwargs):
1835    def query(self, sql: str, **kwargs):
1836        args_dict = {}
1837        args_def = ""
1838        for key, value in kwargs.items():
1839            args_dict[key] = value
1840            args_def += f"-- ${key} ({infer_sql_type(value)})\n"
1841        attach = f"ATTACH 'ducklake://{self.name}' AS dl;USE dl;\n"
1842        sql = args_def + attach + sql
1843        return SqlQuery(
1844            sql, 
1845            lambda sql: self.client.run_inline_script_preview(
1846                content=sql,
1847                language="duckdb",
1848                args=args_dict,
1849            )
1850        )
class SqlQuery:
1852class SqlQuery:
1853    def __init__(self, sql: str, fetch_fn):
1854        self.sql = sql
1855        self.fetch_fn = fetch_fn
1856    def fetch(self, result_collection: str | None = None):
1857        sql = self.sql
1858        if result_collection is not None:
1859            sql = f'-- result_collection={result_collection}\n{sql}'
1860        return self.fetch_fn(sql)
1861    def fetch_one(self):
1862        return self.fetch(result_collection="last_statement_first_row")
SqlQuery(sql: str, fetch_fn)
1853    def __init__(self, sql: str, fetch_fn):
1854        self.sql = sql
1855        self.fetch_fn = fetch_fn
sql
fetch_fn
def fetch(self, result_collection: str | None = None):
1856    def fetch(self, result_collection: str | None = None):
1857        sql = self.sql
1858        if result_collection is not None:
1859            sql = f'-- result_collection={result_collection}\n{sql}'
1860        return self.fetch_fn(sql)
def fetch_one(self):
1861    def fetch_one(self):
1862        return self.fetch(result_collection="last_statement_first_row")
def infer_sql_type(value) -> str:
1864def infer_sql_type(value) -> str:
1865    """
1866    DuckDB executor requires explicit argument types at declaration
1867    These types exist in both DuckDB and Postgres
1868    Check that the types exist if you plan to extend this function for other SQL engines.
1869    """
1870    if isinstance(value, bool):
1871        # Check bool before int since bool is a subclass of int in Python
1872        return "BOOLEAN"
1873    elif isinstance(value, int):
1874        return "BIGINT"
1875    elif isinstance(value, float):
1876        return "DOUBLE PRECISION"
1877    elif value is None:
1878        return "TEXT"
1879    elif isinstance(value, str):
1880        return "TEXT"
1881    elif isinstance(value, dict) or isinstance(value, list):
1882        return "JSON"
1883    else:
1884        return "TEXT"

DuckDB executor requires explicit argument types at declaration These types exist in both DuckDB and Postgres Check that the types exist if you plan to extend this function for other SQL engines.