wmill.client

   1from __future__ import annotations
   2
   3import atexit
   4import datetime as dt
   5import functools
   6from io import BufferedReader, BytesIO
   7import logging
   8import os
   9import random
  10import time
  11import warnings
  12import json
  13from json import JSONDecodeError
  14from typing import Dict, Any, Union, Literal, Optional
  15import re
  16
  17import httpx
  18
  19from .s3_reader import S3BufferedReader, bytes_generator
  20from .s3_types import (
  21    Boto3ConnectionSettings,
  22    DuckDbConnectionSettings,
  23    PolarsConnectionSettings,
  24    S3Object,
  25)
  26
  27_client: "Windmill | None" = None
  28
  29logger = logging.getLogger("windmill_client")
  30
  31JobStatus = Literal["RUNNING", "WAITING", "COMPLETED"]
  32
  33
  34class Windmill:
  35    def __init__(self, base_url=None, token=None, workspace=None, verify=True):
  36        base = (
  37            base_url
  38            or os.environ.get("BASE_INTERNAL_URL")
  39            or os.environ.get("WM_BASE_URL")
  40        )
  41
  42        self.base_url = f"{base}/api"
  43        self.token = token or os.environ.get("WM_TOKEN")
  44        self.headers = {
  45            "Content-Type": "application/json",
  46            "Authorization": f"Bearer {self.token}",
  47        }
  48        self.verify = verify
  49        self.client = self.get_client()
  50        self.workspace = workspace or os.environ.get("WM_WORKSPACE")
  51        self.path = os.environ.get("WM_JOB_PATH")
  52
  53        self.mocked_api = self.get_mocked_api()
  54
  55        assert self.workspace, (
  56            f"workspace required as an argument or as WM_WORKSPACE environment variable"
  57        )
  58
  59    def get_mocked_api(self) -> Optional[dict]:
  60        mocked_path = os.environ.get("WM_MOCKED_API_FILE")
  61        if not mocked_path:
  62            return None
  63        logger.info("Using mocked API from %s", mocked_path)
  64        mocked_api = {"variables": {}, "resources": {}}
  65        try:
  66            with open(mocked_path, "r") as f:
  67                incoming_mocked_api = json.load(f)
  68            mocked_api = {**mocked_api, **incoming_mocked_api}
  69        except Exception as e:
  70            logger.warning(
  71                "Error parsing mocked API file at path %s Using empty mocked API.",
  72                mocked_path,
  73            )
  74            logger.debug(e)
  75        return mocked_api
  76
  77    def get_client(self) -> httpx.Client:
  78        return httpx.Client(
  79            base_url=self.base_url,
  80            headers=self.headers,
  81            verify=self.verify,
  82        )
  83
  84    def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
  85        endpoint = endpoint.lstrip("/")
  86        resp = self.client.get(f"/{endpoint}", **kwargs)
  87        if raise_for_status:
  88            try:
  89                resp.raise_for_status()
  90            except httpx.HTTPStatusError as err:
  91                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
  92                logger.error(error)
  93                raise Exception(error)
  94        return resp
  95
  96    def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
  97        endpoint = endpoint.lstrip("/")
  98        resp = self.client.post(f"/{endpoint}", **kwargs)
  99        if raise_for_status:
 100            try:
 101                resp.raise_for_status()
 102            except httpx.HTTPStatusError as err:
 103                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
 104                logger.error(error)
 105                raise Exception(error)
 106        return resp
 107
 108    def create_token(self, duration=dt.timedelta(days=1)) -> str:
 109        endpoint = "/users/tokens/create"
 110        payload = {
 111            "label": f"refresh {time.time()}",
 112            "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"),
 113        }
 114        return self.post(endpoint, json=payload).text
 115
 116    def run_script_async(
 117        self,
 118        path: str = None,
 119        hash_: str = None,
 120        args: dict = None,
 121        scheduled_in_secs: int = None,
 122    ) -> str:
 123        """Create a script job and return its job id.
 124        
 125        .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.
 126        """
 127        logging.warning(
 128            "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.",
 129        )
 130        assert not (path and hash_), "path and hash_ are mutually exclusive"
 131        return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
 132
 133    def _run_script_async_internal(
 134        self,
 135        path: str = None,
 136        hash_: str = None,
 137        args: dict = None,
 138        scheduled_in_secs: int = None,
 139    ) -> str:
 140        """Internal helper for running scripts asynchronously."""
 141        args = args or {}
 142        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
 143        if os.environ.get("WM_JOB_ID"):
 144            params["parent_job"] = os.environ.get("WM_JOB_ID")
 145        if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
 146            params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
 147        
 148        if path:
 149            endpoint = f"/w/{self.workspace}/jobs/run/p/{path}"
 150        elif hash_:
 151            endpoint = f"/w/{self.workspace}/jobs/run/h/{hash_}"
 152        else:
 153            raise Exception("path or hash_ must be provided")
 154        
 155        return self.post(endpoint, json=args, params=params).text
 156
 157    def run_script_by_path_async(
 158        self,
 159        path: str,
 160        args: dict = None,
 161        scheduled_in_secs: int = None,
 162    ) -> str:
 163        """Create a script job by path and return its job id."""
 164        return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs)
 165
 166    def run_script_by_hash_async(
 167        self,
 168        hash_: str,
 169        args: dict = None,
 170        scheduled_in_secs: int = None,
 171    ) -> str:
 172        """Create a script job by hash and return its job id."""
 173        return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
 174
 175    def run_flow_async(
 176        self,
 177        path: str,
 178        args: dict = None,
 179        scheduled_in_secs: int = None,
 180        # can only be set to false if this the job will be fully await and not concurrent with any other job
 181        # as otherwise the child flow and its own child will store their state in the parent job which will
 182        # lead to incorrectness and failures
 183        do_not_track_in_parent: bool = True,
 184    ) -> str:
 185        """Create a flow job and return its job id."""
 186        args = args or {}
 187        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
 188        if not do_not_track_in_parent:
 189            if os.environ.get("WM_JOB_ID"):
 190                params["parent_job"] = os.environ.get("WM_JOB_ID")
 191            if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
 192                params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
 193        if path:
 194            endpoint = f"/w/{self.workspace}/jobs/run/f/{path}"
 195        else:
 196            raise Exception("path must be provided")
 197        return self.post(endpoint, json=args, params=params).text
 198
 199    def run_script(
 200        self,
 201        path: str = None,
 202        hash_: str = None,
 203        args: dict = None,
 204        timeout: dt.timedelta | int | float | None = None,
 205        verbose: bool = False,
 206        cleanup: bool = True,
 207        assert_result_is_not_none: bool = False,
 208    ) -> Any:
 209        """Run script synchronously and return its result.
 210        
 211        .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
 212        """
 213        logging.warning(
 214            "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.",
 215        )
 216        assert not (path and hash_), "path and hash_ are mutually exclusive"
 217        return self._run_script_internal(
 218            path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose,
 219            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 220        )
 221
 222    def _run_script_internal(
 223        self,
 224        path: str = None,
 225        hash_: str = None,
 226        args: dict = None,
 227        timeout: dt.timedelta | int | float | None = None,
 228        verbose: bool = False,
 229        cleanup: bool = True,
 230        assert_result_is_not_none: bool = False,
 231    ) -> Any:
 232        """Internal helper for running scripts synchronously."""
 233        args = args or {}
 234
 235        if verbose:
 236            if path:
 237                logger.info(f"running `{path}` synchronously with {args = }")
 238            elif hash_:
 239                logger.info(f"running script with hash `{hash_}` synchronously with {args = }")
 240
 241        if isinstance(timeout, dt.timedelta):
 242            timeout = timeout.total_seconds()
 243
 244        job_id = self._run_script_async_internal(path=path, hash_=hash_, args=args)
 245        return self.wait_job(
 246            job_id, timeout, verbose, cleanup, assert_result_is_not_none
 247        )
 248
 249    def run_script_by_path(
 250        self,
 251        path: str,
 252        args: dict = None,
 253        timeout: dt.timedelta | int | float | None = None,
 254        verbose: bool = False,
 255        cleanup: bool = True,
 256        assert_result_is_not_none: bool = False,
 257    ) -> Any:
 258        """Run script by path synchronously and return its result."""
 259        return self._run_script_internal(
 260            path=path, args=args, timeout=timeout, verbose=verbose,
 261            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 262        )
 263
 264    def run_script_by_hash(
 265        self,
 266        hash_: str,
 267        args: dict = None,
 268        timeout: dt.timedelta | int | float | None = None,
 269        verbose: bool = False,
 270        cleanup: bool = True,
 271        assert_result_is_not_none: bool = False,
 272    ) -> Any:
 273        """Run script by hash synchronously and return its result."""
 274        return self._run_script_internal(
 275            hash_=hash_, args=args, timeout=timeout, verbose=verbose,
 276            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
 277        )
 278
 279    def wait_job(
 280        self,
 281        job_id,
 282        timeout: dt.timedelta | int | float | None = None,
 283        verbose: bool = False,
 284        cleanup: bool = True,
 285        assert_result_is_not_none: bool = False,
 286    ):
 287        def cancel_job():
 288            logger.warning(f"cancelling job: {job_id}")
 289            self.post(
 290                f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
 291                json={"reason": "parent script cancelled"},
 292            ).raise_for_status()
 293
 294        if cleanup:
 295            atexit.register(cancel_job)
 296
 297        start_time = time.time()
 298
 299        if isinstance(timeout, dt.timedelta):
 300            timeout = timeout.total_seconds()
 301
 302        while True:
 303            result_res = self.get(
 304                f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True
 305            ).json()
 306
 307            started = result_res["started"]
 308            completed = result_res["completed"]
 309            success = result_res["success"]
 310
 311            if not started and verbose:
 312                logger.info(f"job {job_id} has not started yet")
 313
 314            if cleanup and completed:
 315                atexit.unregister(cancel_job)
 316
 317            if completed:
 318                result = result_res["result"]
 319                if success:
 320                    if result is None and assert_result_is_not_none:
 321                        raise Exception("Result was none")
 322                    return result
 323                else:
 324                    error = result["error"]
 325                    raise Exception(f"Job {job_id} was not successful: {str(error)}")
 326
 327            if timeout and ((time.time() - start_time) > timeout):
 328                msg = "reached timeout"
 329                logger.warning(msg)
 330                self.post(
 331                    f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
 332                    json={"reason": msg},
 333                )
 334                raise TimeoutError(msg)
 335            if verbose:
 336                logger.info(f"sleeping 0.5 seconds for {job_id = }")
 337
 338            time.sleep(0.5)
 339
 340    def cancel_running(self) -> dict:
 341        """Cancel currently running executions of the same script."""
 342        logger.info("canceling running executions of this script")
 343
 344        jobs = self.get(
 345            f"/w/{self.workspace}/jobs/list",
 346            params={
 347                "running": "true",
 348                "script_path_exact": self.path,
 349            },
 350        ).json()
 351
 352        current_job_id = os.environ.get("WM_JOB_ID")
 353
 354        logger.debug(f"{current_job_id = }")
 355
 356        job_ids = [j["id"] for j in jobs if j["id"] != current_job_id]
 357
 358        if job_ids:
 359            logger.info(f"cancelling the following job ids: {job_ids}")
 360        else:
 361            logger.info("no previous executions to cancel")
 362
 363        result = {}
 364
 365        for id_ in job_ids:
 366            result[id_] = self.post(
 367                f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}",
 368                json={"reason": "killed by `cancel_running` method"},
 369            )
 370
 371        return result
 372
 373    def get_job(self, job_id: str) -> dict:
 374        return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json()
 375
 376    def get_root_job_id(self, job_id: str | None = None) -> dict:
 377        job_id = job_id or os.environ.get("WM_JOB_ID")
 378        return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json()
 379
 380    def get_id_token(self, audience: str) -> str:
 381        return self.post(f"/w/{self.workspace}/oidc/token/{audience}").text
 382
 383    def get_job_status(self, job_id: str) -> JobStatus:
 384        job = self.get_job(job_id)
 385        job_type = job.get("type", "")
 386        assert job_type, f"{job} is not a valid job"
 387        if job_type.lower() == "completedjob":
 388            return "COMPLETED"
 389        if job.get("running"):
 390            return "RUNNING"
 391        return "WAITING"
 392
 393    def get_result(
 394        self,
 395        job_id: str,
 396        assert_result_is_not_none: bool = True,
 397    ) -> Any:
 398        result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}")
 399        result_text = result.text
 400        if assert_result_is_not_none and result_text is None:
 401            raise Exception(f"result is None for {job_id = }")
 402        try:
 403            return result.json()
 404        except JSONDecodeError:
 405            return result_text
 406
 407    def get_variable(self, path: str) -> str:
 408        path = parse_variable_syntax(path) or path
 409        if self.mocked_api is not None:
 410            variables = self.mocked_api["variables"]
 411            try:
 412                result = variables[path]
 413                return result
 414            except KeyError:
 415                logger.info(
 416                    f"MockedAPI present, but variable not found at {path}, falling back to real API"
 417                )
 418
 419        """Get variable from Windmill"""
 420        return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json()
 421
 422    def set_variable(self, path: str, value: str, is_secret: bool = False) -> None:
 423        path = parse_variable_syntax(path) or path
 424        if self.mocked_api is not None:
 425            self.mocked_api["variables"][path] = value
 426            return
 427
 428        """Set variable from Windmill"""
 429        # check if variable exists
 430        r = self.get(
 431            f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False
 432        )
 433        if r.status_code == 404:
 434            # create variable
 435            self.post(
 436                f"/w/{self.workspace}/variables/create",
 437                json={
 438                    "path": path,
 439                    "value": value,
 440                    "is_secret": is_secret,
 441                    "description": "",
 442                },
 443            )
 444        else:
 445            # update variable
 446            self.post(
 447                f"/w/{self.workspace}/variables/update/{path}",
 448                json={"value": value},
 449            )
 450
 451    def get_resource(
 452        self,
 453        path: str,
 454        none_if_undefined: bool = False,
 455    ) -> dict | None:
 456        path = parse_resource_syntax(path) or path
 457        if self.mocked_api is not None:
 458            resources = self.mocked_api["resources"]
 459            try:
 460                result = resources[path]
 461                return result
 462            except KeyError:
 463                # NOTE: should mocked_api respect `none_if_undefined`?
 464                if none_if_undefined:
 465                    logger.info(
 466                        f"resource not found at ${path}, but none_if_undefined is True, so returning None"
 467                    )
 468                    return None
 469                logger.info(
 470                    f"MockedAPI present, but resource not found at ${path}, falling back to real API"
 471                )
 472
 473        """Get resource from Windmill"""
 474        try:
 475            return self.get(
 476                f"/w/{self.workspace}/resources/get_value_interpolated/{path}"
 477            ).json()
 478        except Exception as e:
 479            if none_if_undefined:
 480                return None
 481            logger.error(e)
 482            raise e
 483
 484    def set_resource(
 485        self,
 486        value: Any,
 487        path: str,
 488        resource_type: str,
 489    ):
 490        path = parse_resource_syntax(path) or path
 491        if self.mocked_api is not None:
 492            self.mocked_api["resources"][path] = value
 493            return
 494
 495        # check if resource exists
 496        r = self.get(
 497            f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False
 498        )
 499        if r.status_code == 404:
 500            # create resource
 501            self.post(
 502                f"/w/{self.workspace}/resources/create",
 503                json={
 504                    "path": path,
 505                    "value": value,
 506                    "resource_type": resource_type,
 507                },
 508            )
 509        else:
 510            # update resource
 511            self.post(
 512                f"/w/{self.workspace}/resources/update_value/{path}",
 513                json={"value": value},
 514            )
 515
 516    def set_state(self, value: Any):
 517        self.set_resource(value, path=self.state_path, resource_type="state")
 518
 519    def set_progress(self, value: int, job_id: Optional[str] = None):
 520        workspace = get_workspace()
 521        flow_id = os.environ.get("WM_FLOW_JOB_ID")
 522        job_id = job_id or os.environ.get("WM_JOB_ID")
 523
 524        if job_id != None:
 525            job = self.get_job(job_id)
 526            flow_id = job.get("parent_job")
 527
 528        self.post(
 529            f"/w/{workspace}/job_metrics/set_progress/{job_id}",
 530            json={
 531                "percent": value,
 532                "flow_job_id": flow_id or None,
 533            },
 534        )
 535
 536    def get_progress(self, job_id: Optional[str] = None) -> Any:
 537        workspace = get_workspace()
 538        job_id = job_id or os.environ.get("WM_JOB_ID")
 539
 540        r = self.get(
 541            f"/w/{workspace}/job_metrics/get_progress/{job_id}",
 542        )
 543        if r.status_code == 404:
 544            print(f"Job {job_id} does not exist")
 545            return None
 546        else:
 547            return r.json()
 548
 549    def set_flow_user_state(self, key: str, value: Any) -> None:
 550        """Set the user state of a flow at a given key"""
 551        flow_id = self.get_root_job_id()
 552        r = self.post(
 553            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
 554            json=value,
 555            raise_for_status=False,
 556        )
 557        if r.status_code == 404:
 558            print(f"Job {flow_id} does not exist or is not a flow")
 559
 560    def get_flow_user_state(self, key: str) -> Any:
 561        """Get the user state of a flow at a given key"""
 562        flow_id = self.get_root_job_id()
 563        r = self.get(
 564            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
 565            raise_for_status=False,
 566        )
 567        if r.status_code == 404:
 568            print(f"Job {flow_id} does not exist or is not a flow")
 569            return None
 570        else:
 571            return r.json()
 572
 573    @property
 574    def version(self):
 575        return self.get("version").text
 576
 577    def get_duckdb_connection_settings(
 578        self,
 579        s3_resource_path: str = "",
 580    ) -> DuckDbConnectionSettings | None:
 581        """
 582        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 583        initiate an S3 connection from DuckDB
 584        """
 585        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 586        try:
 587            raw_obj = self.post(
 588                f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings",
 589                json={}
 590                if s3_resource_path == ""
 591                else {"s3_resource_path": s3_resource_path},
 592            ).json()
 593            return DuckDbConnectionSettings(raw_obj)
 594        except JSONDecodeError as e:
 595            raise Exception(
 596                "Could not generate DuckDB S3 connection settings from the provided resource"
 597            ) from e
 598
 599    def get_polars_connection_settings(
 600        self,
 601        s3_resource_path: str = "",
 602    ) -> PolarsConnectionSettings:
 603        """
 604        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 605        initiate an S3 connection from Polars
 606        """
 607        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 608        try:
 609            raw_obj = self.post(
 610                f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings",
 611                json={}
 612                if s3_resource_path == ""
 613                else {"s3_resource_path": s3_resource_path},
 614            ).json()
 615            return PolarsConnectionSettings(raw_obj)
 616        except JSONDecodeError as e:
 617            raise Exception(
 618                "Could not generate Polars S3 connection settings from the provided resource"
 619            ) from e
 620
 621    def get_boto3_connection_settings(
 622        self,
 623        s3_resource_path: str = "",
 624    ) -> Boto3ConnectionSettings:
 625        """
 626        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
 627        initiate an S3 connection using boto3
 628        """
 629        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
 630        try:
 631            s3_resource = self.post(
 632                f"/w/{self.workspace}/job_helpers/v2/s3_resource_info",
 633                json={}
 634                if s3_resource_path == ""
 635                else {"s3_resource_path": s3_resource_path},
 636            ).json()
 637            return self.__boto3_connection_settings(s3_resource)
 638        except JSONDecodeError as e:
 639            raise Exception(
 640                "Could not generate Boto3 S3 connection settings from the provided resource"
 641            ) from e
 642
 643    def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes:
 644        """
 645        Load a file from the workspace s3 bucket and returns its content as bytes.
 646
 647        '''python
 648        from wmill import S3Object
 649
 650        s3_obj = S3Object(s3="/path/to/my_file.txt")
 651        my_obj_content = client.load_s3_file(s3_obj)
 652        file_content = my_obj_content.decode("utf-8")
 653        '''
 654        """
 655        s3object = parse_s3_object(s3object)
 656        with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
 657            return file_reader.read()
 658
 659    def load_s3_file_reader(
 660        self, s3object: S3Object | str, s3_resource_path: str | None
 661    ) -> BufferedReader:
 662        """
 663        Load a file from the workspace s3 bucket and returns the bytes stream.
 664
 665        '''python
 666        from wmill import S3Object
 667
 668        s3_obj = S3Object(s3="/path/to/my_file.txt")
 669        with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
 670            print(file_reader.read())
 671        '''
 672        """
 673        s3object = parse_s3_object(s3object)
 674        reader = S3BufferedReader(
 675            f"{self.workspace}",
 676            self.client,
 677            s3object["s3"],
 678            s3_resource_path,
 679            s3object["storage"] if "storage" in s3object else None,
 680        )
 681        return reader
 682
 683    def write_s3_file(
 684        self,
 685        s3object: S3Object | str | None,
 686        file_content: BufferedReader | bytes,
 687        s3_resource_path: str | None,
 688        content_type: str | None = None,
 689        content_disposition: str | None = None,
 690    ) -> S3Object:
 691        """
 692        Write a file to the workspace S3 bucket
 693
 694        '''python
 695        from wmill import S3Object
 696
 697        s3_obj = S3Object(s3="/path/to/my_file.txt")
 698
 699        # for an in memory bytes array:
 700        file_content = b'Hello Windmill!'
 701        client.write_s3_file(s3_obj, file_content)
 702
 703        # for a file:
 704        with open("my_file.txt", "rb") as my_file:
 705            client.write_s3_file(s3_obj, my_file)
 706        '''
 707        """
 708        s3object = parse_s3_object(s3object)
 709        # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator
 710        if isinstance(file_content, BufferedReader):
 711            content_payload = bytes_generator(file_content)
 712        elif isinstance(file_content, bytes):
 713            content_payload = file_content
 714        else:
 715            raise Exception("Type of file_content not supported")
 716
 717        query_params = {}
 718        if s3object is not None and s3object["s3"] != "":
 719            query_params["file_key"] = s3object["s3"]
 720        if s3_resource_path is not None and s3_resource_path != "":
 721            query_params["s3_resource_path"] = s3_resource_path
 722        if (
 723            s3object is not None
 724            and "storage" in s3object
 725            and s3object["storage"] is not None
 726        ):
 727            query_params["storage"] = s3object["storage"]
 728        if content_type is not None:
 729            query_params["content_type"] = content_type
 730        if content_disposition is not None:
 731            query_params["content_disposition"] = content_disposition
 732
 733        try:
 734            # need a vanilla client b/c content-type is not application/json here
 735            response = httpx.post(
 736                f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file",
 737                headers={
 738                    "Authorization": f"Bearer {self.token}",
 739                    "Content-Type": "application/octet-stream",
 740                },
 741                params=query_params,
 742                content=content_payload,
 743                verify=self.verify,
 744                timeout=None,
 745            ).json()
 746        except Exception as e:
 747            raise Exception("Could not write file to S3") from e
 748        return S3Object(s3=response["file_key"])
 749
 750    def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]:
 751        return self.post(
 752            f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))}
 753        ).json()
 754
 755    def sign_s3_object(self, s3_object: S3Object | str) -> S3Object:
 756        return self.post(
 757            f"/w/{self.workspace}/apps/sign_s3_objects",
 758            json={"s3_objects": [s3_object]},
 759        ).json()[0]
 760
 761    def __boto3_connection_settings(self, s3_resource) -> Boto3ConnectionSettings:
 762        endpoint_url_prefix = "https://" if s3_resource["useSSL"] else "http://"
 763        return Boto3ConnectionSettings(
 764            {
 765                "endpoint_url": "{}{}".format(
 766                    endpoint_url_prefix, s3_resource["endPoint"]
 767                ),
 768                "region_name": s3_resource["region"],
 769                "use_ssl": s3_resource["useSSL"],
 770                "aws_access_key_id": s3_resource["accessKey"],
 771                "aws_secret_access_key": s3_resource["secretKey"],
 772                # no need for path_style here as boto3 is clever enough to determine which one to use
 773            }
 774        )
 775
 776    def whoami(self) -> dict:
 777        return self.get("/users/whoami").json()
 778
 779    @property
 780    def user(self) -> dict:
 781        return self.whoami()
 782
 783    @property
 784    def state_path(self) -> str:
 785        state_path = os.environ.get(
 786            "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH")
 787        )
 788        if state_path is None:
 789            raise Exception("State path not found")
 790        return state_path
 791
 792    @property
 793    def state(self) -> Any:
 794        return self.get_resource(path=self.state_path, none_if_undefined=True)
 795
 796    @state.setter
 797    def state(self, value: Any) -> None:
 798        self.set_state(value)
 799
 800    @staticmethod
 801    def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None:
 802        """
 803        Set the state in the shared folder using pickle
 804        """
 805        import pickle
 806
 807        with open(f"/shared/{path}", "wb") as handle:
 808            pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL)
 809
 810    @staticmethod
 811    def get_shared_state_pickle(path: str = "state.pickle") -> Any:
 812        """
 813        Get the state in the shared folder using pickle
 814        """
 815        import pickle
 816
 817        with open(f"/shared/{path}", "rb") as handle:
 818            return pickle.load(handle)
 819
 820    @staticmethod
 821    def set_shared_state(value: Any, path: str = "state.json") -> None:
 822        """
 823        Set the state in the shared folder using pickle
 824        """
 825        import json
 826
 827        with open(f"/shared/{path}", "w", encoding="utf-8") as f:
 828            json.dump(value, f, ensure_ascii=False, indent=4)
 829
 830    @staticmethod
 831    def get_shared_state(path: str = "state.json") -> None:
 832        """
 833        Get the state in the shared folder using pickle
 834        """
 835        import json
 836
 837        with open(f"/shared/{path}", "r", encoding="utf-8") as f:
 838            return json.load(f)
 839
 840    def get_resume_urls(self, approver: str = None) -> dict:
 841        nonce = random.randint(0, 1000000000)
 842        job_id = os.environ.get("WM_JOB_ID") or "NO_ID"
 843        return self.get(
 844            f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}",
 845            params={"approver": approver},
 846        ).json()
 847
 848    def request_interactive_slack_approval(
 849        self,
 850        slack_resource_path: str,
 851        channel_id: str,
 852        message: str = None,
 853        approver: str = None,
 854        default_args_json: dict = None,
 855        dynamic_enums_json: dict = None,
 856    ) -> None:
 857        """
 858        Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
 859
 860        **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.
 861        Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
 862
 863        :param slack_resource_path: The path to the Slack resource in Windmill.
 864        :type slack_resource_path: str
 865        :param channel_id: The Slack channel ID where the approval request will be sent.
 866        :type channel_id: str
 867        :param message: Optional custom message to include in the Slack approval request.
 868        :type message: str, optional
 869        :param approver: Optional user ID or name of the approver for the request.
 870        :type approver: str, optional
 871        :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
 872        :type default_args_json: dict, optional
 873        :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
 874        :type dynamic_enums_json: dict, optional
 875
 876        :raises Exception: If the function is not called within a flow or flow preview.
 877        :raises Exception: If the required flow job or flow step environment variables are not set.
 878
 879        :return: None
 880
 881        **Usage Example:**
 882            >>> client.request_interactive_slack_approval(
 883            ...     slack_resource_path="/u/alex/my_slack_resource",
 884            ...     channel_id="admins-slack-channel",
 885            ...     message="Please approve this request",
 886            ...     approver="approver123",
 887            ...     default_args_json={"key1": "value1", "key2": 42},
 888            ...     dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},
 889            ... )
 890
 891        **Notes:**
 892        - This function must be executed within a Windmill flow or flow preview.
 893        - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context.
 894        """
 895        workspace = self.workspace
 896        flow_job_id = os.environ.get("WM_FLOW_JOB_ID")
 897
 898        if not flow_job_id:
 899            raise Exception(
 900                "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview."
 901            )
 902
 903        # Only include non-empty parameters
 904        params = {}
 905        if message:
 906            params["message"] = message
 907        if approver:
 908            params["approver"] = approver
 909        if slack_resource_path:
 910            params["slack_resource_path"] = slack_resource_path
 911        if channel_id:
 912            params["channel_id"] = channel_id
 913        if os.environ.get("WM_FLOW_STEP_ID"):
 914            params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID")
 915        if default_args_json:
 916            params["default_args_json"] = json.dumps(default_args_json)
 917        if dynamic_enums_json:
 918            params["dynamic_enums_json"] = json.dumps(dynamic_enums_json)
 919
 920        self.get(
 921            f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}",
 922            params=params,
 923        )
 924
 925    def username_to_email(self, username: str) -> str:
 926        """
 927        Get email from workspace username
 928        This method is particularly useful for apps that require the email address of the viewer.
 929        Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
 930        """
 931        return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text
 932
 933    def send_teams_message(
 934        self,
 935        conversation_id: str,
 936        text: str,
 937        success: bool = True,
 938        card_block: dict = None,
 939    ):
 940        """
 941        Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
 942        """
 943        return self.post(
 944            f"/teams/activities",
 945            json={
 946                "conversation_id": conversation_id,
 947                "text": text,
 948                "success": success,
 949                "card_block": card_block,
 950            },
 951        )
 952
 953
 954def init_global_client(f):
 955    @functools.wraps(f)
 956    def wrapper(*args, **kwargs):
 957        global _client
 958        if _client is None:
 959            _client = Windmill()
 960        return f(*args, **kwargs)
 961
 962    return wrapper
 963
 964
 965def deprecate(in_favor_of: str):
 966    def decorator(f):
 967        @functools.wraps(f)
 968        def wrapper(*args, **kwargs):
 969            warnings.warn(
 970                (
 971                    f"The '{f.__name__}' method is deprecated and may be removed in the future. "
 972                    f"Consider {in_favor_of}"
 973                ),
 974                DeprecationWarning,
 975            )
 976            return f(*args, **kwargs)
 977
 978        return wrapper
 979
 980    return decorator
 981
 982
 983@init_global_client
 984def get_workspace() -> str:
 985    return _client.workspace
 986
 987
 988@init_global_client
 989def get_root_job_id(job_id: str | None = None) -> str:
 990    return _client.get_root_job_id(job_id)
 991
 992
 993@init_global_client
 994@deprecate("Windmill().version")
 995def get_version() -> str:
 996    return _client.version
 997
 998
 999@init_global_client
1000def run_script_async(
1001    hash_or_path: str,
1002    args: Dict[str, Any] = None,
1003    scheduled_in_secs: int = None,
1004) -> str:
1005    is_path = "/" in hash_or_path
1006    hash_ = None if is_path else hash_or_path
1007    path = hash_or_path if is_path else None
1008    return _client.run_script_async(
1009        hash_=hash_,
1010        path=path,
1011        args=args,
1012        scheduled_in_secs=scheduled_in_secs,
1013    )
1014
1015
1016@init_global_client
1017def run_flow_async(
1018    path: str,
1019    args: Dict[str, Any] = None,
1020    scheduled_in_secs: int = None,
1021    # can only be set to false if this the job will be fully await and not concurrent with any other job
1022    # as otherwise the child flow and its own child will store their state in the parent job which will
1023    # lead to incorrectness and failures
1024    do_not_track_in_parent: bool = True,
1025) -> str:
1026    return _client.run_flow_async(
1027        path=path,
1028        args=args,
1029        scheduled_in_secs=scheduled_in_secs,
1030        do_not_track_in_parent=do_not_track_in_parent,
1031    )
1032
1033
1034@init_global_client
1035def run_script_sync(
1036    hash: str,
1037    args: Dict[str, Any] = None,
1038    verbose: bool = False,
1039    assert_result_is_not_none: bool = True,
1040    cleanup: bool = True,
1041    timeout: dt.timedelta = None,
1042) -> Any:
1043    return _client.run_script(
1044        hash_=hash,
1045        args=args,
1046        verbose=verbose,
1047        assert_result_is_not_none=assert_result_is_not_none,
1048        cleanup=cleanup,
1049        timeout=timeout,
1050    )
1051
1052
1053@init_global_client
1054def run_script_by_path_async(
1055    path: str,
1056    args: Dict[str, Any] = None,
1057    scheduled_in_secs: Union[None, int] = None,
1058) -> str:
1059    return _client.run_script_by_path_async(
1060        path=path,
1061        args=args,
1062        scheduled_in_secs=scheduled_in_secs,
1063    )
1064
1065
1066@init_global_client
1067def run_script_by_hash_async(
1068    hash_: str,
1069    args: Dict[str, Any] = None,
1070    scheduled_in_secs: Union[None, int] = None,
1071) -> str:
1072    return _client.run_script_by_hash_async(
1073        hash_=hash_,
1074        args=args,
1075        scheduled_in_secs=scheduled_in_secs,
1076    )
1077
1078
1079@init_global_client
1080def run_script_by_path_sync(
1081    path: str,
1082    args: Dict[str, Any] = None,
1083    verbose: bool = False,
1084    assert_result_is_not_none: bool = True,
1085    cleanup: bool = True,
1086    timeout: dt.timedelta = None,
1087) -> Any:
1088    return _client.run_script(
1089        path=path,
1090        args=args,
1091        verbose=verbose,
1092        assert_result_is_not_none=assert_result_is_not_none,
1093        cleanup=cleanup,
1094        timeout=timeout,
1095    )
1096
1097
1098@init_global_client
1099def get_id_token(audience: str) -> str:
1100    """
1101    Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc.
1102    """
1103    return _client.get_id_token(audience)
1104
1105
1106@init_global_client
1107def get_job_status(job_id: str) -> JobStatus:
1108    return _client.get_job_status(job_id)
1109
1110
1111@init_global_client
1112def get_result(job_id: str, assert_result_is_not_none=True) -> Dict[str, Any]:
1113    return _client.get_result(
1114        job_id=job_id, assert_result_is_not_none=assert_result_is_not_none
1115    )
1116
1117
1118@init_global_client
1119def duckdb_connection_settings(s3_resource_path: str = "") -> DuckDbConnectionSettings:
1120    """
1121    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1122    initiate an S3 connection from DuckDB
1123    """
1124    return _client.get_duckdb_connection_settings(s3_resource_path)
1125
1126
1127@init_global_client
1128def polars_connection_settings(s3_resource_path: str = "") -> PolarsConnectionSettings:
1129    """
1130    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1131    initiate an S3 connection from Polars
1132    """
1133    return _client.get_polars_connection_settings(s3_resource_path)
1134
1135
1136@init_global_client
1137def boto3_connection_settings(s3_resource_path: str = "") -> Boto3ConnectionSettings:
1138    """
1139    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1140    initiate an S3 connection using boto3
1141    """
1142    return _client.get_boto3_connection_settings(s3_resource_path)
1143
1144
1145@init_global_client
1146def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None = None) -> bytes:
1147    """
1148    Load the entire content of a file stored in S3 as bytes
1149    """
1150    return _client.load_s3_file(
1151        s3object, s3_resource_path if s3_resource_path != "" else None
1152    )
1153
1154
1155@init_global_client
1156def load_s3_file_reader(
1157    s3object: S3Object | str, s3_resource_path: str | None = None
1158) -> BufferedReader:
1159    """
1160    Load the content of a file stored in S3
1161    """
1162    return _client.load_s3_file_reader(
1163        s3object, s3_resource_path if s3_resource_path != "" else None
1164    )
1165
1166
1167@init_global_client
1168def write_s3_file(
1169    s3object: S3Object | str | None,
1170    file_content: BufferedReader | bytes,
1171    s3_resource_path: str | None = None,
1172    content_type: str | None = None,
1173    content_disposition: str | None = None,
1174) -> S3Object:
1175    """
1176    Upload a file to S3
1177
1178    Content type will be automatically guessed from path extension if left empty
1179
1180    See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition
1181    and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
1182
1183    """
1184    return _client.write_s3_file(
1185        s3object,
1186        file_content,
1187        s3_resource_path if s3_resource_path != "" else None,
1188        content_type,
1189        content_disposition,
1190    )
1191
1192
1193@init_global_client
1194def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]:
1195    """
1196    Sign S3 objects to be used by anonymous users in public apps
1197    Returns a list of signed s3 tokens
1198    """
1199    return _client.sign_s3_objects(s3_objects)
1200
1201
1202@init_global_client
1203def sign_s3_object(s3_object: S3Object| str) -> S3Object:
1204    """
1205    Sign S3 object to be used by anonymous users in public apps
1206    Returns a signed s3 object
1207    """
1208    return _client.sign_s3_object(s3_object)
1209
1210
1211@init_global_client
1212def whoami() -> dict:
1213    """
1214    Returns the current user
1215    """
1216    return _client.user
1217
1218
1219@init_global_client
1220@deprecate("Windmill().state")
1221def get_state() -> Any:
1222    """
1223    Get the state
1224    """
1225    return _client.state
1226
1227
1228@init_global_client
1229def get_resource(
1230    path: str,
1231    none_if_undefined: bool = False,
1232) -> dict | None:
1233    """Get resource from Windmill"""
1234    return _client.get_resource(path, none_if_undefined)
1235
1236
1237@init_global_client
1238def set_resource(path: str, value: Any, resource_type: str = "any") -> None:
1239    """
1240    Set the resource at a given path as a string, creating it if it does not exist
1241    """
1242    return _client.set_resource(value=value, path=path, resource_type=resource_type)
1243
1244
1245@init_global_client
1246def set_state(value: Any) -> None:
1247    """
1248    Set the state
1249    """
1250    return _client.set_state(value)
1251
1252
1253@init_global_client
1254def set_progress(value: int, job_id: Optional[str] = None) -> None:
1255    """
1256    Set the progress
1257    """
1258    return _client.set_progress(value, job_id)
1259
1260
1261@init_global_client
1262def get_progress(job_id: Optional[str] = None) -> Any:
1263    """
1264    Get the progress
1265    """
1266    return _client.get_progress(job_id)
1267
1268
1269def set_shared_state_pickle(value: Any, path="state.pickle") -> None:
1270    """
1271    Set the state in the shared folder using pickle
1272    """
1273    return Windmill.set_shared_state_pickle(value=value, path=path)
1274
1275
1276@deprecate("Windmill.get_shared_state_pickle(...)")
1277def get_shared_state_pickle(path="state.pickle") -> Any:
1278    """
1279    Get the state in the shared folder using pickle
1280    """
1281    return Windmill.get_shared_state_pickle(path=path)
1282
1283
1284def set_shared_state(value: Any, path="state.json") -> None:
1285    """
1286    Set the state in the shared folder using pickle
1287    """
1288    return Windmill.set_shared_state(value=value, path=path)
1289
1290
1291def get_shared_state(path="state.json") -> None:
1292    """
1293    Get the state in the shared folder using pickle
1294    """
1295    return Windmill.get_shared_state(path=path)
1296
1297
1298@init_global_client
1299def get_variable(path: str) -> str:
1300    """
1301    Returns the variable at a given path as a string
1302    """
1303    return _client.get_variable(path)
1304
1305
1306@init_global_client
1307def set_variable(path: str, value: str, is_secret: bool = False) -> None:
1308    """
1309    Set the variable at a given path as a string, creating it if it does not exist
1310    """
1311    return _client.set_variable(path, value, is_secret)
1312
1313
1314@init_global_client
1315def get_flow_user_state(key: str) -> Any:
1316    """
1317    Get the user state of a flow at a given key
1318    """
1319    return _client.get_flow_user_state(key)
1320
1321
1322@init_global_client
1323def set_flow_user_state(key: str, value: Any) -> None:
1324    """
1325    Set the user state of a flow at a given key
1326    """
1327    return _client.set_flow_user_state(key, value)
1328
1329
1330@init_global_client
1331def get_state_path() -> str:
1332    return _client.state_path
1333
1334
1335@init_global_client
1336def get_resume_urls(approver: str = None) -> dict:
1337    return _client.get_resume_urls(approver)
1338
1339
1340@init_global_client
1341def request_interactive_slack_approval(
1342    slack_resource_path: str,
1343    channel_id: str,
1344    message: str = None,
1345    approver: str = None,
1346    default_args_json: dict = None,
1347    dynamic_enums_json: dict = None,
1348) -> None:
1349    return _client.request_interactive_slack_approval(
1350        slack_resource_path=slack_resource_path,
1351        channel_id=channel_id,
1352        message=message,
1353        approver=approver,
1354        default_args_json=default_args_json,
1355        dynamic_enums_json=dynamic_enums_json,
1356    )
1357
1358
1359@init_global_client
1360def send_teams_message(
1361    conversation_id: str, text: str, success: bool, card_block: dict = None
1362):
1363    return _client.send_teams_message(conversation_id, text, success, card_block)
1364
1365
1366@init_global_client
1367def cancel_running() -> dict:
1368    """Cancel currently running executions of the same script."""
1369    return _client.cancel_running()
1370
1371
1372@init_global_client
1373def run_script(
1374    path: str = None,
1375    hash_: str = None,
1376    args: dict = None,
1377    timeout: dt.timedelta | int | float = None,
1378    verbose: bool = False,
1379    cleanup: bool = True,
1380    assert_result_is_not_none: bool = True,
1381) -> Any:
1382    """Run script synchronously and return its result.
1383    
1384    .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
1385    """
1386    return _client.run_script(
1387        path=path,
1388        hash_=hash_,
1389        args=args,
1390        verbose=verbose,
1391        assert_result_is_not_none=assert_result_is_not_none,
1392        cleanup=cleanup,
1393        timeout=timeout,
1394    )
1395
1396
1397@init_global_client
1398def run_script_by_path(
1399    path: str,
1400    args: dict = None,
1401    timeout: dt.timedelta | int | float = None,
1402    verbose: bool = False,
1403    cleanup: bool = True,
1404    assert_result_is_not_none: bool = True,
1405) -> Any:
1406    """Run script by path synchronously and return its result."""
1407    return _client.run_script_by_path(
1408        path=path,
1409        args=args,
1410        verbose=verbose,
1411        assert_result_is_not_none=assert_result_is_not_none,
1412        cleanup=cleanup,
1413        timeout=timeout,
1414    )
1415
1416
1417@init_global_client
1418def run_script_by_hash(
1419    hash_: str,
1420    args: dict = None,
1421    timeout: dt.timedelta | int | float = None,
1422    verbose: bool = False,
1423    cleanup: bool = True,
1424    assert_result_is_not_none: bool = True,
1425) -> Any:
1426    """Run script by hash synchronously and return its result."""
1427    return _client.run_script_by_hash(
1428        hash_=hash_,
1429        args=args,
1430        verbose=verbose,
1431        assert_result_is_not_none=assert_result_is_not_none,
1432        cleanup=cleanup,
1433        timeout=timeout,
1434    )
1435
1436
1437@init_global_client
1438def username_to_email(username: str) -> str:
1439    """
1440    Get email from workspace username
1441    This method is particularly useful for apps that require the email address of the viewer.
1442    Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1443    """
1444    return _client.username_to_email(username)
1445
1446
1447def task(*args, **kwargs):
1448    from inspect import signature
1449
1450    def f(func, tag: str | None = None):
1451        if (
1452            os.environ.get("WM_JOB_ID") is None
1453            or os.environ.get("MAIN_OVERRIDE") == func.__name__
1454        ):
1455
1456            def inner(*args, **kwargs):
1457                return func(*args, **kwargs)
1458
1459            return inner
1460        else:
1461
1462            def inner(*args, **kwargs):
1463                global _client
1464                if _client is None:
1465                    _client = Windmill()
1466                w_id = os.environ.get("WM_WORKSPACE")
1467                job_id = os.environ.get("WM_JOB_ID")
1468                f_name = func.__name__
1469                json = kwargs
1470                params = list(signature(func).parameters)
1471                for i, arg in enumerate(args):
1472                    if i < len(params):
1473                        p = params[i]
1474                        key = p
1475                        if key not in kwargs:
1476                            json[key] = arg
1477
1478                params = {}
1479                if tag is not None:
1480                    params["tag"] = tag
1481                w_as_code_response = _client.post(
1482                    f"/w/{w_id}/jobs/run/workflow_as_code/{job_id}/{f_name}",
1483                    json={"args": json},
1484                    params=params,
1485                )
1486                job_id = w_as_code_response.text
1487                print(f"Executing task {func.__name__} on job {job_id}")
1488                job_result = _client.wait_job(job_id)
1489                print(f"Task {func.__name__} ({job_id}) completed")
1490                return job_result
1491
1492            return inner
1493
1494    if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
1495        return f(args[0], None)
1496    else:
1497        return lambda x: f(x, kwargs.get("tag"))
1498
1499def parse_resource_syntax(s: str) -> Optional[str]:
1500    """Parse resource syntax from string."""
1501    if s is None:
1502        return None
1503    if s.startswith("$res:"):
1504        return s[5:]
1505    if s.startswith("res://"):
1506        return s[6:]
1507    return None
1508
1509def parse_s3_object(s3_object: S3Object | str) -> S3Object:
1510    """Parse S3 object from string or S3Object format."""
1511    if isinstance(s3_object, str):
1512        match = re.match(r'^s3://([^/]*)/(.*)$', s3_object)
1513        if match:
1514            return S3Object(s3=match.group(2) or "", storage=match.group(1) or None)
1515        return S3Object(s3="")
1516    else:
1517        return s3_object
1518
1519    
1520
1521def parse_variable_syntax(s: str) -> Optional[str]:
1522    """Parse variable syntax from string."""
1523    if s.startswith("var://"):
1524        return s[6:]
1525    return None
1526
1527
1528def append_to_result_stream(text: str) -> None:
1529    """Append a text to the result stream.
1530    
1531    Args:
1532        text: text to append to the result stream
1533    """
1534    print("WM_STREAM: {}".format(text.replace(chr(10), '\\n')))
1535
1536def stream_result(stream) -> None:
1537    """Stream to the result stream.
1538    
1539    Args:
1540        stream: stream to stream to the result stream
1541    """
1542    for text in stream:
1543        append_to_result_stream(text)
logger = <Logger windmill_client (WARNING)>
JobStatus = typing.Literal['RUNNING', 'WAITING', 'COMPLETED']
class Windmill:
 35class Windmill:
 36    def __init__(self, base_url=None, token=None, workspace=None, verify=True):
 37        base = (
 38            base_url
 39            or os.environ.get("BASE_INTERNAL_URL")
 40            or os.environ.get("WM_BASE_URL")
 41        )
 42
 43        self.base_url = f"{base}/api"
 44        self.token = token or os.environ.get("WM_TOKEN")
 45        self.headers = {
 46            "Content-Type": "application/json",
 47            "Authorization": f"Bearer {self.token}",
 48        }
 49        self.verify = verify
 50        self.client = self.get_client()
 51        self.workspace = workspace or os.environ.get("WM_WORKSPACE")
 52        self.path = os.environ.get("WM_JOB_PATH")
 53
 54        self.mocked_api = self.get_mocked_api()
 55
 56        assert self.workspace, (
 57            f"workspace required as an argument or as WM_WORKSPACE environment variable"
 58        )
 59
 60    def get_mocked_api(self) -> Optional[dict]:
 61        mocked_path = os.environ.get("WM_MOCKED_API_FILE")
 62        if not mocked_path:
 63            return None
 64        logger.info("Using mocked API from %s", mocked_path)
 65        mocked_api = {"variables": {}, "resources": {}}
 66        try:
 67            with open(mocked_path, "r") as f:
 68                incoming_mocked_api = json.load(f)
 69            mocked_api = {**mocked_api, **incoming_mocked_api}
 70        except Exception as e:
 71            logger.warning(
 72                "Error parsing mocked API file at path %s Using empty mocked API.",
 73                mocked_path,
 74            )
 75            logger.debug(e)
 76        return mocked_api
 77
 78    def get_client(self) -> httpx.Client:
 79        return httpx.Client(
 80            base_url=self.base_url,
 81            headers=self.headers,
 82            verify=self.verify,
 83        )
 84
 85    def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
 86        endpoint = endpoint.lstrip("/")
 87        resp = self.client.get(f"/{endpoint}", **kwargs)
 88        if raise_for_status:
 89            try:
 90                resp.raise_for_status()
 91            except httpx.HTTPStatusError as err:
 92                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
 93                logger.error(error)
 94                raise Exception(error)
 95        return resp
 96
 97    def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
 98        endpoint = endpoint.lstrip("/")
 99        resp = self.client.post(f"/{endpoint}", **kwargs)
100        if raise_for_status:
101            try:
102                resp.raise_for_status()
103            except httpx.HTTPStatusError as err:
104                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
105                logger.error(error)
106                raise Exception(error)
107        return resp
108
109    def create_token(self, duration=dt.timedelta(days=1)) -> str:
110        endpoint = "/users/tokens/create"
111        payload = {
112            "label": f"refresh {time.time()}",
113            "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"),
114        }
115        return self.post(endpoint, json=payload).text
116
117    def run_script_async(
118        self,
119        path: str = None,
120        hash_: str = None,
121        args: dict = None,
122        scheduled_in_secs: int = None,
123    ) -> str:
124        """Create a script job and return its job id.
125        
126        .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.
127        """
128        logging.warning(
129            "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.",
130        )
131        assert not (path and hash_), "path and hash_ are mutually exclusive"
132        return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
133
134    def _run_script_async_internal(
135        self,
136        path: str = None,
137        hash_: str = None,
138        args: dict = None,
139        scheduled_in_secs: int = None,
140    ) -> str:
141        """Internal helper for running scripts asynchronously."""
142        args = args or {}
143        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
144        if os.environ.get("WM_JOB_ID"):
145            params["parent_job"] = os.environ.get("WM_JOB_ID")
146        if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
147            params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
148        
149        if path:
150            endpoint = f"/w/{self.workspace}/jobs/run/p/{path}"
151        elif hash_:
152            endpoint = f"/w/{self.workspace}/jobs/run/h/{hash_}"
153        else:
154            raise Exception("path or hash_ must be provided")
155        
156        return self.post(endpoint, json=args, params=params).text
157
158    def run_script_by_path_async(
159        self,
160        path: str,
161        args: dict = None,
162        scheduled_in_secs: int = None,
163    ) -> str:
164        """Create a script job by path and return its job id."""
165        return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs)
166
167    def run_script_by_hash_async(
168        self,
169        hash_: str,
170        args: dict = None,
171        scheduled_in_secs: int = None,
172    ) -> str:
173        """Create a script job by hash and return its job id."""
174        return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)
175
176    def run_flow_async(
177        self,
178        path: str,
179        args: dict = None,
180        scheduled_in_secs: int = None,
181        # can only be set to false if this the job will be fully await and not concurrent with any other job
182        # as otherwise the child flow and its own child will store their state in the parent job which will
183        # lead to incorrectness and failures
184        do_not_track_in_parent: bool = True,
185    ) -> str:
186        """Create a flow job and return its job id."""
187        args = args or {}
188        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
189        if not do_not_track_in_parent:
190            if os.environ.get("WM_JOB_ID"):
191                params["parent_job"] = os.environ.get("WM_JOB_ID")
192            if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
193                params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
194        if path:
195            endpoint = f"/w/{self.workspace}/jobs/run/f/{path}"
196        else:
197            raise Exception("path must be provided")
198        return self.post(endpoint, json=args, params=params).text
199
200    def run_script(
201        self,
202        path: str = None,
203        hash_: str = None,
204        args: dict = None,
205        timeout: dt.timedelta | int | float | None = None,
206        verbose: bool = False,
207        cleanup: bool = True,
208        assert_result_is_not_none: bool = False,
209    ) -> Any:
210        """Run script synchronously and return its result.
211        
212        .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
213        """
214        logging.warning(
215            "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.",
216        )
217        assert not (path and hash_), "path and hash_ are mutually exclusive"
218        return self._run_script_internal(
219            path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose,
220            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
221        )
222
223    def _run_script_internal(
224        self,
225        path: str = None,
226        hash_: str = None,
227        args: dict = None,
228        timeout: dt.timedelta | int | float | None = None,
229        verbose: bool = False,
230        cleanup: bool = True,
231        assert_result_is_not_none: bool = False,
232    ) -> Any:
233        """Internal helper for running scripts synchronously."""
234        args = args or {}
235
236        if verbose:
237            if path:
238                logger.info(f"running `{path}` synchronously with {args = }")
239            elif hash_:
240                logger.info(f"running script with hash `{hash_}` synchronously with {args = }")
241
242        if isinstance(timeout, dt.timedelta):
243            timeout = timeout.total_seconds()
244
245        job_id = self._run_script_async_internal(path=path, hash_=hash_, args=args)
246        return self.wait_job(
247            job_id, timeout, verbose, cleanup, assert_result_is_not_none
248        )
249
250    def run_script_by_path(
251        self,
252        path: str,
253        args: dict = None,
254        timeout: dt.timedelta | int | float | None = None,
255        verbose: bool = False,
256        cleanup: bool = True,
257        assert_result_is_not_none: bool = False,
258    ) -> Any:
259        """Run script by path synchronously and return its result."""
260        return self._run_script_internal(
261            path=path, args=args, timeout=timeout, verbose=verbose,
262            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
263        )
264
265    def run_script_by_hash(
266        self,
267        hash_: str,
268        args: dict = None,
269        timeout: dt.timedelta | int | float | None = None,
270        verbose: bool = False,
271        cleanup: bool = True,
272        assert_result_is_not_none: bool = False,
273    ) -> Any:
274        """Run script by hash synchronously and return its result."""
275        return self._run_script_internal(
276            hash_=hash_, args=args, timeout=timeout, verbose=verbose,
277            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
278        )
279
280    def wait_job(
281        self,
282        job_id,
283        timeout: dt.timedelta | int | float | None = None,
284        verbose: bool = False,
285        cleanup: bool = True,
286        assert_result_is_not_none: bool = False,
287    ):
288        def cancel_job():
289            logger.warning(f"cancelling job: {job_id}")
290            self.post(
291                f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
292                json={"reason": "parent script cancelled"},
293            ).raise_for_status()
294
295        if cleanup:
296            atexit.register(cancel_job)
297
298        start_time = time.time()
299
300        if isinstance(timeout, dt.timedelta):
301            timeout = timeout.total_seconds()
302
303        while True:
304            result_res = self.get(
305                f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True
306            ).json()
307
308            started = result_res["started"]
309            completed = result_res["completed"]
310            success = result_res["success"]
311
312            if not started and verbose:
313                logger.info(f"job {job_id} has not started yet")
314
315            if cleanup and completed:
316                atexit.unregister(cancel_job)
317
318            if completed:
319                result = result_res["result"]
320                if success:
321                    if result is None and assert_result_is_not_none:
322                        raise Exception("Result was none")
323                    return result
324                else:
325                    error = result["error"]
326                    raise Exception(f"Job {job_id} was not successful: {str(error)}")
327
328            if timeout and ((time.time() - start_time) > timeout):
329                msg = "reached timeout"
330                logger.warning(msg)
331                self.post(
332                    f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
333                    json={"reason": msg},
334                )
335                raise TimeoutError(msg)
336            if verbose:
337                logger.info(f"sleeping 0.5 seconds for {job_id = }")
338
339            time.sleep(0.5)
340
341    def cancel_running(self) -> dict:
342        """Cancel currently running executions of the same script."""
343        logger.info("canceling running executions of this script")
344
345        jobs = self.get(
346            f"/w/{self.workspace}/jobs/list",
347            params={
348                "running": "true",
349                "script_path_exact": self.path,
350            },
351        ).json()
352
353        current_job_id = os.environ.get("WM_JOB_ID")
354
355        logger.debug(f"{current_job_id = }")
356
357        job_ids = [j["id"] for j in jobs if j["id"] != current_job_id]
358
359        if job_ids:
360            logger.info(f"cancelling the following job ids: {job_ids}")
361        else:
362            logger.info("no previous executions to cancel")
363
364        result = {}
365
366        for id_ in job_ids:
367            result[id_] = self.post(
368                f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}",
369                json={"reason": "killed by `cancel_running` method"},
370            )
371
372        return result
373
374    def get_job(self, job_id: str) -> dict:
375        return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json()
376
377    def get_root_job_id(self, job_id: str | None = None) -> dict:
378        job_id = job_id or os.environ.get("WM_JOB_ID")
379        return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json()
380
381    def get_id_token(self, audience: str) -> str:
382        return self.post(f"/w/{self.workspace}/oidc/token/{audience}").text
383
384    def get_job_status(self, job_id: str) -> JobStatus:
385        job = self.get_job(job_id)
386        job_type = job.get("type", "")
387        assert job_type, f"{job} is not a valid job"
388        if job_type.lower() == "completedjob":
389            return "COMPLETED"
390        if job.get("running"):
391            return "RUNNING"
392        return "WAITING"
393
394    def get_result(
395        self,
396        job_id: str,
397        assert_result_is_not_none: bool = True,
398    ) -> Any:
399        result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}")
400        result_text = result.text
401        if assert_result_is_not_none and result_text is None:
402            raise Exception(f"result is None for {job_id = }")
403        try:
404            return result.json()
405        except JSONDecodeError:
406            return result_text
407
408    def get_variable(self, path: str) -> str:
409        path = parse_variable_syntax(path) or path
410        if self.mocked_api is not None:
411            variables = self.mocked_api["variables"]
412            try:
413                result = variables[path]
414                return result
415            except KeyError:
416                logger.info(
417                    f"MockedAPI present, but variable not found at {path}, falling back to real API"
418                )
419
420        """Get variable from Windmill"""
421        return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json()
422
423    def set_variable(self, path: str, value: str, is_secret: bool = False) -> None:
424        path = parse_variable_syntax(path) or path
425        if self.mocked_api is not None:
426            self.mocked_api["variables"][path] = value
427            return
428
429        """Set variable from Windmill"""
430        # check if variable exists
431        r = self.get(
432            f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False
433        )
434        if r.status_code == 404:
435            # create variable
436            self.post(
437                f"/w/{self.workspace}/variables/create",
438                json={
439                    "path": path,
440                    "value": value,
441                    "is_secret": is_secret,
442                    "description": "",
443                },
444            )
445        else:
446            # update variable
447            self.post(
448                f"/w/{self.workspace}/variables/update/{path}",
449                json={"value": value},
450            )
451
452    def get_resource(
453        self,
454        path: str,
455        none_if_undefined: bool = False,
456    ) -> dict | None:
457        path = parse_resource_syntax(path) or path
458        if self.mocked_api is not None:
459            resources = self.mocked_api["resources"]
460            try:
461                result = resources[path]
462                return result
463            except KeyError:
464                # NOTE: should mocked_api respect `none_if_undefined`?
465                if none_if_undefined:
466                    logger.info(
467                        f"resource not found at ${path}, but none_if_undefined is True, so returning None"
468                    )
469                    return None
470                logger.info(
471                    f"MockedAPI present, but resource not found at ${path}, falling back to real API"
472                )
473
474        """Get resource from Windmill"""
475        try:
476            return self.get(
477                f"/w/{self.workspace}/resources/get_value_interpolated/{path}"
478            ).json()
479        except Exception as e:
480            if none_if_undefined:
481                return None
482            logger.error(e)
483            raise e
484
485    def set_resource(
486        self,
487        value: Any,
488        path: str,
489        resource_type: str,
490    ):
491        path = parse_resource_syntax(path) or path
492        if self.mocked_api is not None:
493            self.mocked_api["resources"][path] = value
494            return
495
496        # check if resource exists
497        r = self.get(
498            f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False
499        )
500        if r.status_code == 404:
501            # create resource
502            self.post(
503                f"/w/{self.workspace}/resources/create",
504                json={
505                    "path": path,
506                    "value": value,
507                    "resource_type": resource_type,
508                },
509            )
510        else:
511            # update resource
512            self.post(
513                f"/w/{self.workspace}/resources/update_value/{path}",
514                json={"value": value},
515            )
516
517    def set_state(self, value: Any):
518        self.set_resource(value, path=self.state_path, resource_type="state")
519
520    def set_progress(self, value: int, job_id: Optional[str] = None):
521        workspace = get_workspace()
522        flow_id = os.environ.get("WM_FLOW_JOB_ID")
523        job_id = job_id or os.environ.get("WM_JOB_ID")
524
525        if job_id != None:
526            job = self.get_job(job_id)
527            flow_id = job.get("parent_job")
528
529        self.post(
530            f"/w/{workspace}/job_metrics/set_progress/{job_id}",
531            json={
532                "percent": value,
533                "flow_job_id": flow_id or None,
534            },
535        )
536
537    def get_progress(self, job_id: Optional[str] = None) -> Any:
538        workspace = get_workspace()
539        job_id = job_id or os.environ.get("WM_JOB_ID")
540
541        r = self.get(
542            f"/w/{workspace}/job_metrics/get_progress/{job_id}",
543        )
544        if r.status_code == 404:
545            print(f"Job {job_id} does not exist")
546            return None
547        else:
548            return r.json()
549
550    def set_flow_user_state(self, key: str, value: Any) -> None:
551        """Set the user state of a flow at a given key"""
552        flow_id = self.get_root_job_id()
553        r = self.post(
554            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
555            json=value,
556            raise_for_status=False,
557        )
558        if r.status_code == 404:
559            print(f"Job {flow_id} does not exist or is not a flow")
560
561    def get_flow_user_state(self, key: str) -> Any:
562        """Get the user state of a flow at a given key"""
563        flow_id = self.get_root_job_id()
564        r = self.get(
565            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
566            raise_for_status=False,
567        )
568        if r.status_code == 404:
569            print(f"Job {flow_id} does not exist or is not a flow")
570            return None
571        else:
572            return r.json()
573
574    @property
575    def version(self):
576        return self.get("version").text
577
578    def get_duckdb_connection_settings(
579        self,
580        s3_resource_path: str = "",
581    ) -> DuckDbConnectionSettings | None:
582        """
583        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
584        initiate an S3 connection from DuckDB
585        """
586        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
587        try:
588            raw_obj = self.post(
589                f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings",
590                json={}
591                if s3_resource_path == ""
592                else {"s3_resource_path": s3_resource_path},
593            ).json()
594            return DuckDbConnectionSettings(raw_obj)
595        except JSONDecodeError as e:
596            raise Exception(
597                "Could not generate DuckDB S3 connection settings from the provided resource"
598            ) from e
599
600    def get_polars_connection_settings(
601        self,
602        s3_resource_path: str = "",
603    ) -> PolarsConnectionSettings:
604        """
605        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
606        initiate an S3 connection from Polars
607        """
608        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
609        try:
610            raw_obj = self.post(
611                f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings",
612                json={}
613                if s3_resource_path == ""
614                else {"s3_resource_path": s3_resource_path},
615            ).json()
616            return PolarsConnectionSettings(raw_obj)
617        except JSONDecodeError as e:
618            raise Exception(
619                "Could not generate Polars S3 connection settings from the provided resource"
620            ) from e
621
622    def get_boto3_connection_settings(
623        self,
624        s3_resource_path: str = "",
625    ) -> Boto3ConnectionSettings:
626        """
627        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
628        initiate an S3 connection using boto3
629        """
630        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
631        try:
632            s3_resource = self.post(
633                f"/w/{self.workspace}/job_helpers/v2/s3_resource_info",
634                json={}
635                if s3_resource_path == ""
636                else {"s3_resource_path": s3_resource_path},
637            ).json()
638            return self.__boto3_connection_settings(s3_resource)
639        except JSONDecodeError as e:
640            raise Exception(
641                "Could not generate Boto3 S3 connection settings from the provided resource"
642            ) from e
643
644    def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes:
645        """
646        Load a file from the workspace s3 bucket and returns its content as bytes.
647
648        '''python
649        from wmill import S3Object
650
651        s3_obj = S3Object(s3="/path/to/my_file.txt")
652        my_obj_content = client.load_s3_file(s3_obj)
653        file_content = my_obj_content.decode("utf-8")
654        '''
655        """
656        s3object = parse_s3_object(s3object)
657        with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
658            return file_reader.read()
659
660    def load_s3_file_reader(
661        self, s3object: S3Object | str, s3_resource_path: str | None
662    ) -> BufferedReader:
663        """
664        Load a file from the workspace s3 bucket and returns the bytes stream.
665
666        '''python
667        from wmill import S3Object
668
669        s3_obj = S3Object(s3="/path/to/my_file.txt")
670        with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
671            print(file_reader.read())
672        '''
673        """
674        s3object = parse_s3_object(s3object)
675        reader = S3BufferedReader(
676            f"{self.workspace}",
677            self.client,
678            s3object["s3"],
679            s3_resource_path,
680            s3object["storage"] if "storage" in s3object else None,
681        )
682        return reader
683
684    def write_s3_file(
685        self,
686        s3object: S3Object | str | None,
687        file_content: BufferedReader | bytes,
688        s3_resource_path: str | None,
689        content_type: str | None = None,
690        content_disposition: str | None = None,
691    ) -> S3Object:
692        """
693        Write a file to the workspace S3 bucket
694
695        '''python
696        from wmill import S3Object
697
698        s3_obj = S3Object(s3="/path/to/my_file.txt")
699
700        # for an in memory bytes array:
701        file_content = b'Hello Windmill!'
702        client.write_s3_file(s3_obj, file_content)
703
704        # for a file:
705        with open("my_file.txt", "rb") as my_file:
706            client.write_s3_file(s3_obj, my_file)
707        '''
708        """
709        s3object = parse_s3_object(s3object)
710        # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator
711        if isinstance(file_content, BufferedReader):
712            content_payload = bytes_generator(file_content)
713        elif isinstance(file_content, bytes):
714            content_payload = file_content
715        else:
716            raise Exception("Type of file_content not supported")
717
718        query_params = {}
719        if s3object is not None and s3object["s3"] != "":
720            query_params["file_key"] = s3object["s3"]
721        if s3_resource_path is not None and s3_resource_path != "":
722            query_params["s3_resource_path"] = s3_resource_path
723        if (
724            s3object is not None
725            and "storage" in s3object
726            and s3object["storage"] is not None
727        ):
728            query_params["storage"] = s3object["storage"]
729        if content_type is not None:
730            query_params["content_type"] = content_type
731        if content_disposition is not None:
732            query_params["content_disposition"] = content_disposition
733
734        try:
735            # need a vanilla client b/c content-type is not application/json here
736            response = httpx.post(
737                f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file",
738                headers={
739                    "Authorization": f"Bearer {self.token}",
740                    "Content-Type": "application/octet-stream",
741                },
742                params=query_params,
743                content=content_payload,
744                verify=self.verify,
745                timeout=None,
746            ).json()
747        except Exception as e:
748            raise Exception("Could not write file to S3") from e
749        return S3Object(s3=response["file_key"])
750
751    def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]:
752        return self.post(
753            f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))}
754        ).json()
755
756    def sign_s3_object(self, s3_object: S3Object | str) -> S3Object:
757        return self.post(
758            f"/w/{self.workspace}/apps/sign_s3_objects",
759            json={"s3_objects": [s3_object]},
760        ).json()[0]
761
762    def __boto3_connection_settings(self, s3_resource) -> Boto3ConnectionSettings:
763        endpoint_url_prefix = "https://" if s3_resource["useSSL"] else "http://"
764        return Boto3ConnectionSettings(
765            {
766                "endpoint_url": "{}{}".format(
767                    endpoint_url_prefix, s3_resource["endPoint"]
768                ),
769                "region_name": s3_resource["region"],
770                "use_ssl": s3_resource["useSSL"],
771                "aws_access_key_id": s3_resource["accessKey"],
772                "aws_secret_access_key": s3_resource["secretKey"],
773                # no need for path_style here as boto3 is clever enough to determine which one to use
774            }
775        )
776
777    def whoami(self) -> dict:
778        return self.get("/users/whoami").json()
779
780    @property
781    def user(self) -> dict:
782        return self.whoami()
783
784    @property
785    def state_path(self) -> str:
786        state_path = os.environ.get(
787            "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH")
788        )
789        if state_path is None:
790            raise Exception("State path not found")
791        return state_path
792
793    @property
794    def state(self) -> Any:
795        return self.get_resource(path=self.state_path, none_if_undefined=True)
796
797    @state.setter
798    def state(self, value: Any) -> None:
799        self.set_state(value)
800
801    @staticmethod
802    def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None:
803        """
804        Set the state in the shared folder using pickle
805        """
806        import pickle
807
808        with open(f"/shared/{path}", "wb") as handle:
809            pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL)
810
811    @staticmethod
812    def get_shared_state_pickle(path: str = "state.pickle") -> Any:
813        """
814        Get the state in the shared folder using pickle
815        """
816        import pickle
817
818        with open(f"/shared/{path}", "rb") as handle:
819            return pickle.load(handle)
820
821    @staticmethod
822    def set_shared_state(value: Any, path: str = "state.json") -> None:
823        """
824        Set the state in the shared folder using pickle
825        """
826        import json
827
828        with open(f"/shared/{path}", "w", encoding="utf-8") as f:
829            json.dump(value, f, ensure_ascii=False, indent=4)
830
831    @staticmethod
832    def get_shared_state(path: str = "state.json") -> None:
833        """
834        Get the state in the shared folder using pickle
835        """
836        import json
837
838        with open(f"/shared/{path}", "r", encoding="utf-8") as f:
839            return json.load(f)
840
841    def get_resume_urls(self, approver: str = None) -> dict:
842        nonce = random.randint(0, 1000000000)
843        job_id = os.environ.get("WM_JOB_ID") or "NO_ID"
844        return self.get(
845            f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}",
846            params={"approver": approver},
847        ).json()
848
849    def request_interactive_slack_approval(
850        self,
851        slack_resource_path: str,
852        channel_id: str,
853        message: str = None,
854        approver: str = None,
855        default_args_json: dict = None,
856        dynamic_enums_json: dict = None,
857    ) -> None:
858        """
859        Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
860
861        **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.
862        Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
863
864        :param slack_resource_path: The path to the Slack resource in Windmill.
865        :type slack_resource_path: str
866        :param channel_id: The Slack channel ID where the approval request will be sent.
867        :type channel_id: str
868        :param message: Optional custom message to include in the Slack approval request.
869        :type message: str, optional
870        :param approver: Optional user ID or name of the approver for the request.
871        :type approver: str, optional
872        :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
873        :type default_args_json: dict, optional
874        :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
875        :type dynamic_enums_json: dict, optional
876
877        :raises Exception: If the function is not called within a flow or flow preview.
878        :raises Exception: If the required flow job or flow step environment variables are not set.
879
880        :return: None
881
882        **Usage Example:**
883            >>> client.request_interactive_slack_approval(
884            ...     slack_resource_path="/u/alex/my_slack_resource",
885            ...     channel_id="admins-slack-channel",
886            ...     message="Please approve this request",
887            ...     approver="approver123",
888            ...     default_args_json={"key1": "value1", "key2": 42},
889            ...     dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},
890            ... )
891
892        **Notes:**
893        - This function must be executed within a Windmill flow or flow preview.
894        - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context.
895        """
896        workspace = self.workspace
897        flow_job_id = os.environ.get("WM_FLOW_JOB_ID")
898
899        if not flow_job_id:
900            raise Exception(
901                "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview."
902            )
903
904        # Only include non-empty parameters
905        params = {}
906        if message:
907            params["message"] = message
908        if approver:
909            params["approver"] = approver
910        if slack_resource_path:
911            params["slack_resource_path"] = slack_resource_path
912        if channel_id:
913            params["channel_id"] = channel_id
914        if os.environ.get("WM_FLOW_STEP_ID"):
915            params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID")
916        if default_args_json:
917            params["default_args_json"] = json.dumps(default_args_json)
918        if dynamic_enums_json:
919            params["dynamic_enums_json"] = json.dumps(dynamic_enums_json)
920
921        self.get(
922            f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}",
923            params=params,
924        )
925
926    def username_to_email(self, username: str) -> str:
927        """
928        Get email from workspace username
929        This method is particularly useful for apps that require the email address of the viewer.
930        Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
931        """
932        return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text
933
934    def send_teams_message(
935        self,
936        conversation_id: str,
937        text: str,
938        success: bool = True,
939        card_block: dict = None,
940    ):
941        """
942        Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
943        """
944        return self.post(
945            f"/teams/activities",
946            json={
947                "conversation_id": conversation_id,
948                "text": text,
949                "success": success,
950                "card_block": card_block,
951            },
952        )
Windmill(base_url=None, token=None, workspace=None, verify=True)
36    def __init__(self, base_url=None, token=None, workspace=None, verify=True):
37        base = (
38            base_url
39            or os.environ.get("BASE_INTERNAL_URL")
40            or os.environ.get("WM_BASE_URL")
41        )
42
43        self.base_url = f"{base}/api"
44        self.token = token or os.environ.get("WM_TOKEN")
45        self.headers = {
46            "Content-Type": "application/json",
47            "Authorization": f"Bearer {self.token}",
48        }
49        self.verify = verify
50        self.client = self.get_client()
51        self.workspace = workspace or os.environ.get("WM_WORKSPACE")
52        self.path = os.environ.get("WM_JOB_PATH")
53
54        self.mocked_api = self.get_mocked_api()
55
56        assert self.workspace, (
57            f"workspace required as an argument or as WM_WORKSPACE environment variable"
58        )
base_url
token
headers
verify
client
workspace
path
mocked_api
def get_mocked_api(self) -> Optional[dict]:
60    def get_mocked_api(self) -> Optional[dict]:
61        mocked_path = os.environ.get("WM_MOCKED_API_FILE")
62        if not mocked_path:
63            return None
64        logger.info("Using mocked API from %s", mocked_path)
65        mocked_api = {"variables": {}, "resources": {}}
66        try:
67            with open(mocked_path, "r") as f:
68                incoming_mocked_api = json.load(f)
69            mocked_api = {**mocked_api, **incoming_mocked_api}
70        except Exception as e:
71            logger.warning(
72                "Error parsing mocked API file at path %s Using empty mocked API.",
73                mocked_path,
74            )
75            logger.debug(e)
76        return mocked_api
def get_client(self) -> httpx.Client:
78    def get_client(self) -> httpx.Client:
79        return httpx.Client(
80            base_url=self.base_url,
81            headers=self.headers,
82            verify=self.verify,
83        )
def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
85    def get(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
86        endpoint = endpoint.lstrip("/")
87        resp = self.client.get(f"/{endpoint}", **kwargs)
88        if raise_for_status:
89            try:
90                resp.raise_for_status()
91            except httpx.HTTPStatusError as err:
92                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
93                logger.error(error)
94                raise Exception(error)
95        return resp
def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
 97    def post(self, endpoint, raise_for_status=True, **kwargs) -> httpx.Response:
 98        endpoint = endpoint.lstrip("/")
 99        resp = self.client.post(f"/{endpoint}", **kwargs)
100        if raise_for_status:
101            try:
102                resp.raise_for_status()
103            except httpx.HTTPStatusError as err:
104                error = f"{err.request.url}: {err.response.status_code}, {err.response.text}"
105                logger.error(error)
106                raise Exception(error)
107        return resp
def create_token(self, duration=datetime.timedelta(days=1)) -> str:
109    def create_token(self, duration=dt.timedelta(days=1)) -> str:
110        endpoint = "/users/tokens/create"
111        payload = {
112            "label": f"refresh {time.time()}",
113            "expiration": (dt.datetime.now() + duration).strftime("%Y-%m-%dT%H:%M:%SZ"),
114        }
115        return self.post(endpoint, json=payload).text
def run_script_async( self, path: str = None, hash_: str = None, args: dict = None, scheduled_in_secs: int = None) -> str:
117    def run_script_async(
118        self,
119        path: str = None,
120        hash_: str = None,
121        args: dict = None,
122        scheduled_in_secs: int = None,
123    ) -> str:
124        """Create a script job and return its job id.
125        
126        .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.
127        """
128        logging.warning(
129            "run_script_async is deprecated. Use run_script_by_path_async or run_script_by_hash_async instead.",
130        )
131        assert not (path and hash_), "path and hash_ are mutually exclusive"
132        return self._run_script_async_internal(path=path, hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)

Create a script job and return its job id.

Deprecated since version Use run_script_by_path_async or run_script_by_hash_async instead..

def run_script_by_path_async(self, path: str, args: dict = None, scheduled_in_secs: int = None) -> str:
158    def run_script_by_path_async(
159        self,
160        path: str,
161        args: dict = None,
162        scheduled_in_secs: int = None,
163    ) -> str:
164        """Create a script job by path and return its job id."""
165        return self._run_script_async_internal(path=path, args=args, scheduled_in_secs=scheduled_in_secs)

Create a script job by path and return its job id.

def run_script_by_hash_async( self, hash_: str, args: dict = None, scheduled_in_secs: int = None) -> str:
167    def run_script_by_hash_async(
168        self,
169        hash_: str,
170        args: dict = None,
171        scheduled_in_secs: int = None,
172    ) -> str:
173        """Create a script job by hash and return its job id."""
174        return self._run_script_async_internal(hash_=hash_, args=args, scheduled_in_secs=scheduled_in_secs)

Create a script job by hash and return its job id.

def run_flow_async( self, path: str, args: dict = None, scheduled_in_secs: int = None, do_not_track_in_parent: bool = True) -> str:
176    def run_flow_async(
177        self,
178        path: str,
179        args: dict = None,
180        scheduled_in_secs: int = None,
181        # can only be set to false if this the job will be fully await and not concurrent with any other job
182        # as otherwise the child flow and its own child will store their state in the parent job which will
183        # lead to incorrectness and failures
184        do_not_track_in_parent: bool = True,
185    ) -> str:
186        """Create a flow job and return its job id."""
187        args = args or {}
188        params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
189        if not do_not_track_in_parent:
190            if os.environ.get("WM_JOB_ID"):
191                params["parent_job"] = os.environ.get("WM_JOB_ID")
192            if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
193                params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
194        if path:
195            endpoint = f"/w/{self.workspace}/jobs/run/f/{path}"
196        else:
197            raise Exception("path must be provided")
198        return self.post(endpoint, json=args, params=params).text

Create a flow job and return its job id.

def run_script( self, path: str = None, hash_: str = None, args: dict = None, timeout: datetime.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any:
200    def run_script(
201        self,
202        path: str = None,
203        hash_: str = None,
204        args: dict = None,
205        timeout: dt.timedelta | int | float | None = None,
206        verbose: bool = False,
207        cleanup: bool = True,
208        assert_result_is_not_none: bool = False,
209    ) -> Any:
210        """Run script synchronously and return its result.
211        
212        .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
213        """
214        logging.warning(
215            "run_script is deprecated. Use run_script_by_path or run_script_by_hash instead.",
216        )
217        assert not (path and hash_), "path and hash_ are mutually exclusive"
218        return self._run_script_internal(
219            path=path, hash_=hash_, args=args, timeout=timeout, verbose=verbose,
220            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
221        )

Run script synchronously and return its result.

Deprecated since version Use run_script_by_path or run_script_by_hash instead..

def run_script_by_path( self, path: str, args: dict = None, timeout: datetime.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any:
250    def run_script_by_path(
251        self,
252        path: str,
253        args: dict = None,
254        timeout: dt.timedelta | int | float | None = None,
255        verbose: bool = False,
256        cleanup: bool = True,
257        assert_result_is_not_none: bool = False,
258    ) -> Any:
259        """Run script by path synchronously and return its result."""
260        return self._run_script_internal(
261            path=path, args=args, timeout=timeout, verbose=verbose,
262            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
263        )

Run script by path synchronously and return its result.

def run_script_by_hash( self, hash_: str, args: dict = None, timeout: datetime.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any:
265    def run_script_by_hash(
266        self,
267        hash_: str,
268        args: dict = None,
269        timeout: dt.timedelta | int | float | None = None,
270        verbose: bool = False,
271        cleanup: bool = True,
272        assert_result_is_not_none: bool = False,
273    ) -> Any:
274        """Run script by hash synchronously and return its result."""
275        return self._run_script_internal(
276            hash_=hash_, args=args, timeout=timeout, verbose=verbose,
277            cleanup=cleanup, assert_result_is_not_none=assert_result_is_not_none
278        )

Run script by hash synchronously and return its result.

def wait_job( self, job_id, timeout: datetime.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False):
280    def wait_job(
281        self,
282        job_id,
283        timeout: dt.timedelta | int | float | None = None,
284        verbose: bool = False,
285        cleanup: bool = True,
286        assert_result_is_not_none: bool = False,
287    ):
288        def cancel_job():
289            logger.warning(f"cancelling job: {job_id}")
290            self.post(
291                f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
292                json={"reason": "parent script cancelled"},
293            ).raise_for_status()
294
295        if cleanup:
296            atexit.register(cancel_job)
297
298        start_time = time.time()
299
300        if isinstance(timeout, dt.timedelta):
301            timeout = timeout.total_seconds()
302
303        while True:
304            result_res = self.get(
305                f"/w/{self.workspace}/jobs_u/completed/get_result_maybe/{job_id}", True
306            ).json()
307
308            started = result_res["started"]
309            completed = result_res["completed"]
310            success = result_res["success"]
311
312            if not started and verbose:
313                logger.info(f"job {job_id} has not started yet")
314
315            if cleanup and completed:
316                atexit.unregister(cancel_job)
317
318            if completed:
319                result = result_res["result"]
320                if success:
321                    if result is None and assert_result_is_not_none:
322                        raise Exception("Result was none")
323                    return result
324                else:
325                    error = result["error"]
326                    raise Exception(f"Job {job_id} was not successful: {str(error)}")
327
328            if timeout and ((time.time() - start_time) > timeout):
329                msg = "reached timeout"
330                logger.warning(msg)
331                self.post(
332                    f"/w/{self.workspace}/jobs_u/queue/cancel/{job_id}",
333                    json={"reason": msg},
334                )
335                raise TimeoutError(msg)
336            if verbose:
337                logger.info(f"sleeping 0.5 seconds for {job_id = }")
338
339            time.sleep(0.5)
def cancel_running(self) -> dict:
341    def cancel_running(self) -> dict:
342        """Cancel currently running executions of the same script."""
343        logger.info("canceling running executions of this script")
344
345        jobs = self.get(
346            f"/w/{self.workspace}/jobs/list",
347            params={
348                "running": "true",
349                "script_path_exact": self.path,
350            },
351        ).json()
352
353        current_job_id = os.environ.get("WM_JOB_ID")
354
355        logger.debug(f"{current_job_id = }")
356
357        job_ids = [j["id"] for j in jobs if j["id"] != current_job_id]
358
359        if job_ids:
360            logger.info(f"cancelling the following job ids: {job_ids}")
361        else:
362            logger.info("no previous executions to cancel")
363
364        result = {}
365
366        for id_ in job_ids:
367            result[id_] = self.post(
368                f"/w/{self.workspace}/jobs_u/queue/cancel/{id_}",
369                json={"reason": "killed by `cancel_running` method"},
370            )
371
372        return result

Cancel currently running executions of the same script.

def get_job(self, job_id: str) -> dict:
374    def get_job(self, job_id: str) -> dict:
375        return self.get(f"/w/{self.workspace}/jobs_u/get/{job_id}").json()
def get_root_job_id(self, job_id: str | None = None) -> dict:
377    def get_root_job_id(self, job_id: str | None = None) -> dict:
378        job_id = job_id or os.environ.get("WM_JOB_ID")
379        return self.get(f"/w/{self.workspace}/jobs_u/get_root_job_id/{job_id}").json()
def get_id_token(self, audience: str) -> str:
381    def get_id_token(self, audience: str) -> str:
382        return self.post(f"/w/{self.workspace}/oidc/token/{audience}").text
def get_job_status(self, job_id: str) -> Literal['RUNNING', 'WAITING', 'COMPLETED']:
384    def get_job_status(self, job_id: str) -> JobStatus:
385        job = self.get_job(job_id)
386        job_type = job.get("type", "")
387        assert job_type, f"{job} is not a valid job"
388        if job_type.lower() == "completedjob":
389            return "COMPLETED"
390        if job.get("running"):
391            return "RUNNING"
392        return "WAITING"
def get_result(self, job_id: str, assert_result_is_not_none: bool = True) -> Any:
394    def get_result(
395        self,
396        job_id: str,
397        assert_result_is_not_none: bool = True,
398    ) -> Any:
399        result = self.get(f"/w/{self.workspace}/jobs_u/completed/get_result/{job_id}")
400        result_text = result.text
401        if assert_result_is_not_none and result_text is None:
402            raise Exception(f"result is None for {job_id = }")
403        try:
404            return result.json()
405        except JSONDecodeError:
406            return result_text
def get_variable(self, path: str) -> str:
408    def get_variable(self, path: str) -> str:
409        path = parse_variable_syntax(path) or path
410        if self.mocked_api is not None:
411            variables = self.mocked_api["variables"]
412            try:
413                result = variables[path]
414                return result
415            except KeyError:
416                logger.info(
417                    f"MockedAPI present, but variable not found at {path}, falling back to real API"
418                )
419
420        """Get variable from Windmill"""
421        return self.get(f"/w/{self.workspace}/variables/get_value/{path}").json()
def set_variable(self, path: str, value: str, is_secret: bool = False) -> None:
423    def set_variable(self, path: str, value: str, is_secret: bool = False) -> None:
424        path = parse_variable_syntax(path) or path
425        if self.mocked_api is not None:
426            self.mocked_api["variables"][path] = value
427            return
428
429        """Set variable from Windmill"""
430        # check if variable exists
431        r = self.get(
432            f"/w/{self.workspace}/variables/get/{path}", raise_for_status=False
433        )
434        if r.status_code == 404:
435            # create variable
436            self.post(
437                f"/w/{self.workspace}/variables/create",
438                json={
439                    "path": path,
440                    "value": value,
441                    "is_secret": is_secret,
442                    "description": "",
443                },
444            )
445        else:
446            # update variable
447            self.post(
448                f"/w/{self.workspace}/variables/update/{path}",
449                json={"value": value},
450            )
def get_resource(self, path: str, none_if_undefined: bool = False) -> dict | None:
452    def get_resource(
453        self,
454        path: str,
455        none_if_undefined: bool = False,
456    ) -> dict | None:
457        path = parse_resource_syntax(path) or path
458        if self.mocked_api is not None:
459            resources = self.mocked_api["resources"]
460            try:
461                result = resources[path]
462                return result
463            except KeyError:
464                # NOTE: should mocked_api respect `none_if_undefined`?
465                if none_if_undefined:
466                    logger.info(
467                        f"resource not found at ${path}, but none_if_undefined is True, so returning None"
468                    )
469                    return None
470                logger.info(
471                    f"MockedAPI present, but resource not found at ${path}, falling back to real API"
472                )
473
474        """Get resource from Windmill"""
475        try:
476            return self.get(
477                f"/w/{self.workspace}/resources/get_value_interpolated/{path}"
478            ).json()
479        except Exception as e:
480            if none_if_undefined:
481                return None
482            logger.error(e)
483            raise e
def set_resource(self, value: Any, path: str, resource_type: str):
485    def set_resource(
486        self,
487        value: Any,
488        path: str,
489        resource_type: str,
490    ):
491        path = parse_resource_syntax(path) or path
492        if self.mocked_api is not None:
493            self.mocked_api["resources"][path] = value
494            return
495
496        # check if resource exists
497        r = self.get(
498            f"/w/{self.workspace}/resources/get/{path}", raise_for_status=False
499        )
500        if r.status_code == 404:
501            # create resource
502            self.post(
503                f"/w/{self.workspace}/resources/create",
504                json={
505                    "path": path,
506                    "value": value,
507                    "resource_type": resource_type,
508                },
509            )
510        else:
511            # update resource
512            self.post(
513                f"/w/{self.workspace}/resources/update_value/{path}",
514                json={"value": value},
515            )
def set_state(self, value: Any):
517    def set_state(self, value: Any):
518        self.set_resource(value, path=self.state_path, resource_type="state")
def set_progress(self, value: int, job_id: Optional[str] = None):
520    def set_progress(self, value: int, job_id: Optional[str] = None):
521        workspace = get_workspace()
522        flow_id = os.environ.get("WM_FLOW_JOB_ID")
523        job_id = job_id or os.environ.get("WM_JOB_ID")
524
525        if job_id != None:
526            job = self.get_job(job_id)
527            flow_id = job.get("parent_job")
528
529        self.post(
530            f"/w/{workspace}/job_metrics/set_progress/{job_id}",
531            json={
532                "percent": value,
533                "flow_job_id": flow_id or None,
534            },
535        )
def get_progress(self, job_id: Optional[str] = None) -> Any:
537    def get_progress(self, job_id: Optional[str] = None) -> Any:
538        workspace = get_workspace()
539        job_id = job_id or os.environ.get("WM_JOB_ID")
540
541        r = self.get(
542            f"/w/{workspace}/job_metrics/get_progress/{job_id}",
543        )
544        if r.status_code == 404:
545            print(f"Job {job_id} does not exist")
546            return None
547        else:
548            return r.json()
def set_flow_user_state(self, key: str, value: Any) -> None:
550    def set_flow_user_state(self, key: str, value: Any) -> None:
551        """Set the user state of a flow at a given key"""
552        flow_id = self.get_root_job_id()
553        r = self.post(
554            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
555            json=value,
556            raise_for_status=False,
557        )
558        if r.status_code == 404:
559            print(f"Job {flow_id} does not exist or is not a flow")

Set the user state of a flow at a given key

def get_flow_user_state(self, key: str) -> Any:
561    def get_flow_user_state(self, key: str) -> Any:
562        """Get the user state of a flow at a given key"""
563        flow_id = self.get_root_job_id()
564        r = self.get(
565            f"/w/{self.workspace}/jobs/flow/user_states/{flow_id}/{key}",
566            raise_for_status=False,
567        )
568        if r.status_code == 404:
569            print(f"Job {flow_id} does not exist or is not a flow")
570            return None
571        else:
572            return r.json()

Get the user state of a flow at a given key

version
574    @property
575    def version(self):
576        return self.get("version").text
def get_duckdb_connection_settings( self, s3_resource_path: str = '') -> wmill.s3_types.DuckDbConnectionSettings | None:
578    def get_duckdb_connection_settings(
579        self,
580        s3_resource_path: str = "",
581    ) -> DuckDbConnectionSettings | None:
582        """
583        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
584        initiate an S3 connection from DuckDB
585        """
586        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
587        try:
588            raw_obj = self.post(
589                f"/w/{self.workspace}/job_helpers/v2/duckdb_connection_settings",
590                json={}
591                if s3_resource_path == ""
592                else {"s3_resource_path": s3_resource_path},
593            ).json()
594            return DuckDbConnectionSettings(raw_obj)
595        except JSONDecodeError as e:
596            raise Exception(
597                "Could not generate DuckDB S3 connection settings from the provided resource"
598            ) from e

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from DuckDB

def get_polars_connection_settings( self, s3_resource_path: str = '') -> wmill.s3_types.PolarsConnectionSettings:
600    def get_polars_connection_settings(
601        self,
602        s3_resource_path: str = "",
603    ) -> PolarsConnectionSettings:
604        """
605        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
606        initiate an S3 connection from Polars
607        """
608        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
609        try:
610            raw_obj = self.post(
611                f"/w/{self.workspace}/job_helpers/v2/polars_connection_settings",
612                json={}
613                if s3_resource_path == ""
614                else {"s3_resource_path": s3_resource_path},
615            ).json()
616            return PolarsConnectionSettings(raw_obj)
617        except JSONDecodeError as e:
618            raise Exception(
619                "Could not generate Polars S3 connection settings from the provided resource"
620            ) from e

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from Polars

def get_boto3_connection_settings( self, s3_resource_path: str = '') -> wmill.s3_types.Boto3ConnectionSettings:
622    def get_boto3_connection_settings(
623        self,
624        s3_resource_path: str = "",
625    ) -> Boto3ConnectionSettings:
626        """
627        Convenient helpers that takes an S3 resource as input and returns the settings necessary to
628        initiate an S3 connection using boto3
629        """
630        s3_resource_path = parse_resource_syntax(s3_resource_path) or s3_resource_path
631        try:
632            s3_resource = self.post(
633                f"/w/{self.workspace}/job_helpers/v2/s3_resource_info",
634                json={}
635                if s3_resource_path == ""
636                else {"s3_resource_path": s3_resource_path},
637            ).json()
638            return self.__boto3_connection_settings(s3_resource)
639        except JSONDecodeError as e:
640            raise Exception(
641                "Could not generate Boto3 S3 connection settings from the provided resource"
642            ) from e

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection using boto3

def load_s3_file( self, s3object: wmill.s3_types.S3Object | str, s3_resource_path: str | None) -> bytes:
644    def load_s3_file(self, s3object: S3Object | str, s3_resource_path: str | None) -> bytes:
645        """
646        Load a file from the workspace s3 bucket and returns its content as bytes.
647
648        '''python
649        from wmill import S3Object
650
651        s3_obj = S3Object(s3="/path/to/my_file.txt")
652        my_obj_content = client.load_s3_file(s3_obj)
653        file_content = my_obj_content.decode("utf-8")
654        '''
655        """
656        s3object = parse_s3_object(s3object)
657        with self.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
658            return file_reader.read()

Load a file from the workspace s3 bucket and returns its content as bytes.

'''python from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt") my_obj_content = client.load_s3_file(s3_obj) file_content = my_obj_content.decode("utf-8") '''

def load_s3_file_reader( self, s3object: wmill.s3_types.S3Object | str, s3_resource_path: str | None) -> _io.BufferedReader:
660    def load_s3_file_reader(
661        self, s3object: S3Object | str, s3_resource_path: str | None
662    ) -> BufferedReader:
663        """
664        Load a file from the workspace s3 bucket and returns the bytes stream.
665
666        '''python
667        from wmill import S3Object
668
669        s3_obj = S3Object(s3="/path/to/my_file.txt")
670        with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
671            print(file_reader.read())
672        '''
673        """
674        s3object = parse_s3_object(s3object)
675        reader = S3BufferedReader(
676            f"{self.workspace}",
677            self.client,
678            s3object["s3"],
679            s3_resource_path,
680            s3object["storage"] if "storage" in s3object else None,
681        )
682        return reader

Load a file from the workspace s3 bucket and returns the bytes stream.

'''python from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt") with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader: print(file_reader.read()) '''

def write_s3_file( self, s3object: wmill.s3_types.S3Object | str | None, file_content: _io.BufferedReader | bytes, s3_resource_path: str | None, content_type: str | None = None, content_disposition: str | None = None) -> wmill.s3_types.S3Object:
684    def write_s3_file(
685        self,
686        s3object: S3Object | str | None,
687        file_content: BufferedReader | bytes,
688        s3_resource_path: str | None,
689        content_type: str | None = None,
690        content_disposition: str | None = None,
691    ) -> S3Object:
692        """
693        Write a file to the workspace S3 bucket
694
695        '''python
696        from wmill import S3Object
697
698        s3_obj = S3Object(s3="/path/to/my_file.txt")
699
700        # for an in memory bytes array:
701        file_content = b'Hello Windmill!'
702        client.write_s3_file(s3_obj, file_content)
703
704        # for a file:
705        with open("my_file.txt", "rb") as my_file:
706            client.write_s3_file(s3_obj, my_file)
707        '''
708        """
709        s3object = parse_s3_object(s3object)
710        # httpx accepts either bytes or "a bytes generator" as content. If it's a BufferedReader, we need to convert it to a generator
711        if isinstance(file_content, BufferedReader):
712            content_payload = bytes_generator(file_content)
713        elif isinstance(file_content, bytes):
714            content_payload = file_content
715        else:
716            raise Exception("Type of file_content not supported")
717
718        query_params = {}
719        if s3object is not None and s3object["s3"] != "":
720            query_params["file_key"] = s3object["s3"]
721        if s3_resource_path is not None and s3_resource_path != "":
722            query_params["s3_resource_path"] = s3_resource_path
723        if (
724            s3object is not None
725            and "storage" in s3object
726            and s3object["storage"] is not None
727        ):
728            query_params["storage"] = s3object["storage"]
729        if content_type is not None:
730            query_params["content_type"] = content_type
731        if content_disposition is not None:
732            query_params["content_disposition"] = content_disposition
733
734        try:
735            # need a vanilla client b/c content-type is not application/json here
736            response = httpx.post(
737                f"{self.base_url}/w/{self.workspace}/job_helpers/upload_s3_file",
738                headers={
739                    "Authorization": f"Bearer {self.token}",
740                    "Content-Type": "application/octet-stream",
741                },
742                params=query_params,
743                content=content_payload,
744                verify=self.verify,
745                timeout=None,
746            ).json()
747        except Exception as e:
748            raise Exception("Could not write file to S3") from e
749        return S3Object(s3=response["file_key"])

Write a file to the workspace S3 bucket

'''python from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt")

for an in memory bytes array:

file_content = b'Hello Windmill!' client.write_s3_file(s3_obj, file_content)

for a file:

with open("my_file.txt", "rb") as my_file: client.write_s3_file(s3_obj, my_file) '''

def sign_s3_objects( self, s3_objects: list[wmill.s3_types.S3Object | str]) -> list[wmill.s3_types.S3Object]:
751    def sign_s3_objects(self, s3_objects: list[S3Object | str]) -> list[S3Object]:
752        return self.post(
753            f"/w/{self.workspace}/apps/sign_s3_objects", json={"s3_objects": list(map(parse_s3_object, s3_objects))}
754        ).json()
def sign_s3_object( self, s3_object: wmill.s3_types.S3Object | str) -> wmill.s3_types.S3Object:
756    def sign_s3_object(self, s3_object: S3Object | str) -> S3Object:
757        return self.post(
758            f"/w/{self.workspace}/apps/sign_s3_objects",
759            json={"s3_objects": [s3_object]},
760        ).json()[0]
def whoami(self) -> dict:
777    def whoami(self) -> dict:
778        return self.get("/users/whoami").json()
user: dict
780    @property
781    def user(self) -> dict:
782        return self.whoami()
state_path: str
784    @property
785    def state_path(self) -> str:
786        state_path = os.environ.get(
787            "WM_STATE_PATH_NEW", os.environ.get("WM_STATE_PATH")
788        )
789        if state_path is None:
790            raise Exception("State path not found")
791        return state_path
state: Any
793    @property
794    def state(self) -> Any:
795        return self.get_resource(path=self.state_path, none_if_undefined=True)
@staticmethod
def set_shared_state_pickle(value: Any, path: str = 'state.pickle') -> None:
801    @staticmethod
802    def set_shared_state_pickle(value: Any, path: str = "state.pickle") -> None:
803        """
804        Set the state in the shared folder using pickle
805        """
806        import pickle
807
808        with open(f"/shared/{path}", "wb") as handle:
809            pickle.dump(value, handle, protocol=pickle.HIGHEST_PROTOCOL)

Set the state in the shared folder using pickle

@staticmethod
def get_shared_state_pickle(path: str = 'state.pickle') -> Any:
811    @staticmethod
812    def get_shared_state_pickle(path: str = "state.pickle") -> Any:
813        """
814        Get the state in the shared folder using pickle
815        """
816        import pickle
817
818        with open(f"/shared/{path}", "rb") as handle:
819            return pickle.load(handle)

Get the state in the shared folder using pickle

@staticmethod
def set_shared_state(value: Any, path: str = 'state.json') -> None:
821    @staticmethod
822    def set_shared_state(value: Any, path: str = "state.json") -> None:
823        """
824        Set the state in the shared folder using pickle
825        """
826        import json
827
828        with open(f"/shared/{path}", "w", encoding="utf-8") as f:
829            json.dump(value, f, ensure_ascii=False, indent=4)

Set the state in the shared folder using pickle

@staticmethod
def get_shared_state(path: str = 'state.json') -> None:
831    @staticmethod
832    def get_shared_state(path: str = "state.json") -> None:
833        """
834        Get the state in the shared folder using pickle
835        """
836        import json
837
838        with open(f"/shared/{path}", "r", encoding="utf-8") as f:
839            return json.load(f)

Get the state in the shared folder using pickle

def get_resume_urls(self, approver: str = None) -> dict:
841    def get_resume_urls(self, approver: str = None) -> dict:
842        nonce = random.randint(0, 1000000000)
843        job_id = os.environ.get("WM_JOB_ID") or "NO_ID"
844        return self.get(
845            f"/w/{self.workspace}/jobs/resume_urls/{job_id}/{nonce}",
846            params={"approver": approver},
847        ).json()
def request_interactive_slack_approval( self, slack_resource_path: str, channel_id: str, message: str = None, approver: str = None, default_args_json: dict = None, dynamic_enums_json: dict = None) -> None:
849    def request_interactive_slack_approval(
850        self,
851        slack_resource_path: str,
852        channel_id: str,
853        message: str = None,
854        approver: str = None,
855        default_args_json: dict = None,
856        dynamic_enums_json: dict = None,
857    ) -> None:
858        """
859        Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
860
861        **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.
862        Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
863
864        :param slack_resource_path: The path to the Slack resource in Windmill.
865        :type slack_resource_path: str
866        :param channel_id: The Slack channel ID where the approval request will be sent.
867        :type channel_id: str
868        :param message: Optional custom message to include in the Slack approval request.
869        :type message: str, optional
870        :param approver: Optional user ID or name of the approver for the request.
871        :type approver: str, optional
872        :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
873        :type default_args_json: dict, optional
874        :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
875        :type dynamic_enums_json: dict, optional
876
877        :raises Exception: If the function is not called within a flow or flow preview.
878        :raises Exception: If the required flow job or flow step environment variables are not set.
879
880        :return: None
881
882        **Usage Example:**
883            >>> client.request_interactive_slack_approval(
884            ...     slack_resource_path="/u/alex/my_slack_resource",
885            ...     channel_id="admins-slack-channel",
886            ...     message="Please approve this request",
887            ...     approver="approver123",
888            ...     default_args_json={"key1": "value1", "key2": 42},
889            ...     dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},
890            ... )
891
892        **Notes:**
893        - This function must be executed within a Windmill flow or flow preview.
894        - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context.
895        """
896        workspace = self.workspace
897        flow_job_id = os.environ.get("WM_FLOW_JOB_ID")
898
899        if not flow_job_id:
900            raise Exception(
901                "You can't use 'request_interactive_slack_approval' function in a standalone script or flow step preview. Please use it in a flow or a flow preview."
902            )
903
904        # Only include non-empty parameters
905        params = {}
906        if message:
907            params["message"] = message
908        if approver:
909            params["approver"] = approver
910        if slack_resource_path:
911            params["slack_resource_path"] = slack_resource_path
912        if channel_id:
913            params["channel_id"] = channel_id
914        if os.environ.get("WM_FLOW_STEP_ID"):
915            params["flow_step_id"] = os.environ.get("WM_FLOW_STEP_ID")
916        if default_args_json:
917            params["default_args_json"] = json.dumps(default_args_json)
918        if dynamic_enums_json:
919            params["dynamic_enums_json"] = json.dumps(dynamic_enums_json)
920
921        self.get(
922            f"/w/{workspace}/jobs/slack_approval/{os.environ.get('WM_JOB_ID', 'NO_JOB_ID')}",
923            params=params,
924        )

Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.

[Enterprise Edition Only] To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality. Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form

Parameters
  • slack_resource_path: The path to the Slack resource in Windmill.
  • channel_id: The Slack channel ID where the approval request will be sent.
  • message: Optional custom message to include in the Slack approval request.
  • approver: Optional user ID or name of the approver for the request.
  • default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
  • dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
Raises
  • Exception: If the function is not called within a flow or flow preview.
  • Exception: If the required flow job or flow step environment variables are not set.
Returns

None

Usage Example:

client.request_interactive_slack_approval( ... slack_resource_path="/u/alex/my_slack_resource", ... channel_id="admins-slack-channel", ... message="Please approve this request", ... approver="approver123", ... default_args_json={"key1": "value1", "key2": 42}, ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]}, ... )

Notes:

  • This function must be executed within a Windmill flow or flow preview.
  • The function checks for required environment variables (WM_FLOW_JOB_ID, WM_FLOW_STEP_ID) to ensure it is run in the appropriate context.
def username_to_email(self, username: str) -> str:
926    def username_to_email(self, username: str) -> str:
927        """
928        Get email from workspace username
929        This method is particularly useful for apps that require the email address of the viewer.
930        Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
931        """
932        return self.get(f"/w/{self.workspace}/users/username_to_email/{username}").text

Get email from workspace username This method is particularly useful for apps that require the email address of the viewer. Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.

def send_teams_message( self, conversation_id: str, text: str, success: bool = True, card_block: dict = None):
934    def send_teams_message(
935        self,
936        conversation_id: str,
937        text: str,
938        success: bool = True,
939        card_block: dict = None,
940    ):
941        """
942        Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
943        """
944        return self.post(
945            f"/teams/activities",
946            json={
947                "conversation_id": conversation_id,
948                "text": text,
949                "success": success,
950                "card_block": card_block,
951            },
952        )

Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message

def init_global_client(f):
955def init_global_client(f):
956    @functools.wraps(f)
957    def wrapper(*args, **kwargs):
958        global _client
959        if _client is None:
960            _client = Windmill()
961        return f(*args, **kwargs)
962
963    return wrapper
def deprecate(in_favor_of: str):
966def deprecate(in_favor_of: str):
967    def decorator(f):
968        @functools.wraps(f)
969        def wrapper(*args, **kwargs):
970            warnings.warn(
971                (
972                    f"The '{f.__name__}' method is deprecated and may be removed in the future. "
973                    f"Consider {in_favor_of}"
974                ),
975                DeprecationWarning,
976            )
977            return f(*args, **kwargs)
978
979        return wrapper
980
981    return decorator
@init_global_client
def get_workspace() -> str:
984@init_global_client
985def get_workspace() -> str:
986    return _client.workspace
@init_global_client
def get_root_job_id(job_id: str | None = None) -> str:
989@init_global_client
990def get_root_job_id(job_id: str | None = None) -> str:
991    return _client.get_root_job_id(job_id)
@init_global_client
@deprecate('Windmill().version')
def get_version() -> str:
994@init_global_client
995@deprecate("Windmill().version")
996def get_version() -> str:
997    return _client.version
@init_global_client
def run_script_async( hash_or_path: str, args: Dict[str, Any] = None, scheduled_in_secs: int = None) -> str:
1000@init_global_client
1001def run_script_async(
1002    hash_or_path: str,
1003    args: Dict[str, Any] = None,
1004    scheduled_in_secs: int = None,
1005) -> str:
1006    is_path = "/" in hash_or_path
1007    hash_ = None if is_path else hash_or_path
1008    path = hash_or_path if is_path else None
1009    return _client.run_script_async(
1010        hash_=hash_,
1011        path=path,
1012        args=args,
1013        scheduled_in_secs=scheduled_in_secs,
1014    )
@init_global_client
def run_flow_async( path: str, args: Dict[str, Any] = None, scheduled_in_secs: int = None, do_not_track_in_parent: bool = True) -> str:
1017@init_global_client
1018def run_flow_async(
1019    path: str,
1020    args: Dict[str, Any] = None,
1021    scheduled_in_secs: int = None,
1022    # can only be set to false if this the job will be fully await and not concurrent with any other job
1023    # as otherwise the child flow and its own child will store their state in the parent job which will
1024    # lead to incorrectness and failures
1025    do_not_track_in_parent: bool = True,
1026) -> str:
1027    return _client.run_flow_async(
1028        path=path,
1029        args=args,
1030        scheduled_in_secs=scheduled_in_secs,
1031        do_not_track_in_parent=do_not_track_in_parent,
1032    )
@init_global_client
def run_script_sync( hash: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: datetime.timedelta = None) -> Any:
1035@init_global_client
1036def run_script_sync(
1037    hash: str,
1038    args: Dict[str, Any] = None,
1039    verbose: bool = False,
1040    assert_result_is_not_none: bool = True,
1041    cleanup: bool = True,
1042    timeout: dt.timedelta = None,
1043) -> Any:
1044    return _client.run_script(
1045        hash_=hash,
1046        args=args,
1047        verbose=verbose,
1048        assert_result_is_not_none=assert_result_is_not_none,
1049        cleanup=cleanup,
1050        timeout=timeout,
1051    )
@init_global_client
def run_script_by_path_async( path: str, args: Dict[str, Any] = None, scheduled_in_secs: Optional[int] = None) -> str:
1054@init_global_client
1055def run_script_by_path_async(
1056    path: str,
1057    args: Dict[str, Any] = None,
1058    scheduled_in_secs: Union[None, int] = None,
1059) -> str:
1060    return _client.run_script_by_path_async(
1061        path=path,
1062        args=args,
1063        scheduled_in_secs=scheduled_in_secs,
1064    )
@init_global_client
def run_script_by_hash_async( hash_: str, args: Dict[str, Any] = None, scheduled_in_secs: Optional[int] = None) -> str:
1067@init_global_client
1068def run_script_by_hash_async(
1069    hash_: str,
1070    args: Dict[str, Any] = None,
1071    scheduled_in_secs: Union[None, int] = None,
1072) -> str:
1073    return _client.run_script_by_hash_async(
1074        hash_=hash_,
1075        args=args,
1076        scheduled_in_secs=scheduled_in_secs,
1077    )
@init_global_client
def run_script_by_path_sync( path: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: datetime.timedelta = None) -> Any:
1080@init_global_client
1081def run_script_by_path_sync(
1082    path: str,
1083    args: Dict[str, Any] = None,
1084    verbose: bool = False,
1085    assert_result_is_not_none: bool = True,
1086    cleanup: bool = True,
1087    timeout: dt.timedelta = None,
1088) -> Any:
1089    return _client.run_script(
1090        path=path,
1091        args=args,
1092        verbose=verbose,
1093        assert_result_is_not_none=assert_result_is_not_none,
1094        cleanup=cleanup,
1095        timeout=timeout,
1096    )
@init_global_client
def get_id_token(audience: str) -> str:
1099@init_global_client
1100def get_id_token(audience: str) -> str:
1101    """
1102    Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc.
1103    """
1104    return _client.get_id_token(audience)

Get a JWT token for the given audience for OIDC purposes to login into third parties like AWS, Vault, GCP, etc.

@init_global_client
def get_job_status(job_id: str) -> Literal['RUNNING', 'WAITING', 'COMPLETED']:
1107@init_global_client
1108def get_job_status(job_id: str) -> JobStatus:
1109    return _client.get_job_status(job_id)
@init_global_client
def get_result(job_id: str, assert_result_is_not_none=True) -> Dict[str, Any]:
1112@init_global_client
1113def get_result(job_id: str, assert_result_is_not_none=True) -> Dict[str, Any]:
1114    return _client.get_result(
1115        job_id=job_id, assert_result_is_not_none=assert_result_is_not_none
1116    )
@init_global_client
def duckdb_connection_settings(s3_resource_path: str = '') -> wmill.s3_types.DuckDbConnectionSettings:
1119@init_global_client
1120def duckdb_connection_settings(s3_resource_path: str = "") -> DuckDbConnectionSettings:
1121    """
1122    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1123    initiate an S3 connection from DuckDB
1124    """
1125    return _client.get_duckdb_connection_settings(s3_resource_path)

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from DuckDB

@init_global_client
def polars_connection_settings(s3_resource_path: str = '') -> wmill.s3_types.PolarsConnectionSettings:
1128@init_global_client
1129def polars_connection_settings(s3_resource_path: str = "") -> PolarsConnectionSettings:
1130    """
1131    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1132    initiate an S3 connection from Polars
1133    """
1134    return _client.get_polars_connection_settings(s3_resource_path)

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection from Polars

@init_global_client
def boto3_connection_settings(s3_resource_path: str = '') -> wmill.s3_types.Boto3ConnectionSettings:
1137@init_global_client
1138def boto3_connection_settings(s3_resource_path: str = "") -> Boto3ConnectionSettings:
1139    """
1140    Convenient helpers that takes an S3 resource as input and returns the settings necessary to
1141    initiate an S3 connection using boto3
1142    """
1143    return _client.get_boto3_connection_settings(s3_resource_path)

Convenient helpers that takes an S3 resource as input and returns the settings necessary to initiate an S3 connection using boto3

@init_global_client
def load_s3_file( s3object: wmill.s3_types.S3Object | str, s3_resource_path: str | None = None) -> bytes:
1146@init_global_client
1147def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None = None) -> bytes:
1148    """
1149    Load the entire content of a file stored in S3 as bytes
1150    """
1151    return _client.load_s3_file(
1152        s3object, s3_resource_path if s3_resource_path != "" else None
1153    )

Load the entire content of a file stored in S3 as bytes

@init_global_client
def load_s3_file_reader( s3object: wmill.s3_types.S3Object | str, s3_resource_path: str | None = None) -> _io.BufferedReader:
1156@init_global_client
1157def load_s3_file_reader(
1158    s3object: S3Object | str, s3_resource_path: str | None = None
1159) -> BufferedReader:
1160    """
1161    Load the content of a file stored in S3
1162    """
1163    return _client.load_s3_file_reader(
1164        s3object, s3_resource_path if s3_resource_path != "" else None
1165    )

Load the content of a file stored in S3

@init_global_client
def write_s3_file( s3object: wmill.s3_types.S3Object | str | None, file_content: _io.BufferedReader | bytes, s3_resource_path: str | None = None, content_type: str | None = None, content_disposition: str | None = None) -> wmill.s3_types.S3Object:
1168@init_global_client
1169def write_s3_file(
1170    s3object: S3Object | str | None,
1171    file_content: BufferedReader | bytes,
1172    s3_resource_path: str | None = None,
1173    content_type: str | None = None,
1174    content_disposition: str | None = None,
1175) -> S3Object:
1176    """
1177    Upload a file to S3
1178
1179    Content type will be automatically guessed from path extension if left empty
1180
1181    See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition
1182    and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
1183
1184    """
1185    return _client.write_s3_file(
1186        s3object,
1187        file_content,
1188        s3_resource_path if s3_resource_path != "" else None,
1189        content_type,
1190        content_disposition,
1191    )

Upload a file to S3

Content type will be automatically guessed from path extension if left empty

See MDN for content_disposition: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition and content_type: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type

@init_global_client
def sign_s3_objects( s3_objects: list[wmill.s3_types.S3Object | str]) -> list[wmill.s3_types.S3Object]:
1194@init_global_client
1195def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]:
1196    """
1197    Sign S3 objects to be used by anonymous users in public apps
1198    Returns a list of signed s3 tokens
1199    """
1200    return _client.sign_s3_objects(s3_objects)

Sign S3 objects to be used by anonymous users in public apps Returns a list of signed s3 tokens

@init_global_client
def sign_s3_object(s3_object: wmill.s3_types.S3Object | str) -> wmill.s3_types.S3Object:
1203@init_global_client
1204def sign_s3_object(s3_object: S3Object| str) -> S3Object:
1205    """
1206    Sign S3 object to be used by anonymous users in public apps
1207    Returns a signed s3 object
1208    """
1209    return _client.sign_s3_object(s3_object)

Sign S3 object to be used by anonymous users in public apps Returns a signed s3 object

@init_global_client
def whoami() -> dict:
1212@init_global_client
1213def whoami() -> dict:
1214    """
1215    Returns the current user
1216    """
1217    return _client.user

Returns the current user

@init_global_client
@deprecate('Windmill().state')
def get_state() -> Any:
1220@init_global_client
1221@deprecate("Windmill().state")
1222def get_state() -> Any:
1223    """
1224    Get the state
1225    """
1226    return _client.state

Get the state

@init_global_client
def get_resource(path: str, none_if_undefined: bool = False) -> dict | None:
1229@init_global_client
1230def get_resource(
1231    path: str,
1232    none_if_undefined: bool = False,
1233) -> dict | None:
1234    """Get resource from Windmill"""
1235    return _client.get_resource(path, none_if_undefined)

Get resource from Windmill

@init_global_client
def set_resource(path: str, value: Any, resource_type: str = 'any') -> None:
1238@init_global_client
1239def set_resource(path: str, value: Any, resource_type: str = "any") -> None:
1240    """
1241    Set the resource at a given path as a string, creating it if it does not exist
1242    """
1243    return _client.set_resource(value=value, path=path, resource_type=resource_type)

Set the resource at a given path as a string, creating it if it does not exist

@init_global_client
def set_state(value: Any) -> None:
1246@init_global_client
1247def set_state(value: Any) -> None:
1248    """
1249    Set the state
1250    """
1251    return _client.set_state(value)

Set the state

@init_global_client
def set_progress(value: int, job_id: Optional[str] = None) -> None:
1254@init_global_client
1255def set_progress(value: int, job_id: Optional[str] = None) -> None:
1256    """
1257    Set the progress
1258    """
1259    return _client.set_progress(value, job_id)

Set the progress

@init_global_client
def get_progress(job_id: Optional[str] = None) -> Any:
1262@init_global_client
1263def get_progress(job_id: Optional[str] = None) -> Any:
1264    """
1265    Get the progress
1266    """
1267    return _client.get_progress(job_id)

Get the progress

def set_shared_state_pickle(value: Any, path='state.pickle') -> None:
1270def set_shared_state_pickle(value: Any, path="state.pickle") -> None:
1271    """
1272    Set the state in the shared folder using pickle
1273    """
1274    return Windmill.set_shared_state_pickle(value=value, path=path)

Set the state in the shared folder using pickle

@deprecate('Windmill.get_shared_state_pickle(...)')
def get_shared_state_pickle(path='state.pickle') -> Any:
1277@deprecate("Windmill.get_shared_state_pickle(...)")
1278def get_shared_state_pickle(path="state.pickle") -> Any:
1279    """
1280    Get the state in the shared folder using pickle
1281    """
1282    return Windmill.get_shared_state_pickle(path=path)

Get the state in the shared folder using pickle

def set_shared_state(value: Any, path='state.json') -> None:
1285def set_shared_state(value: Any, path="state.json") -> None:
1286    """
1287    Set the state in the shared folder using pickle
1288    """
1289    return Windmill.set_shared_state(value=value, path=path)

Set the state in the shared folder using pickle

def get_shared_state(path='state.json') -> None:
1292def get_shared_state(path="state.json") -> None:
1293    """
1294    Get the state in the shared folder using pickle
1295    """
1296    return Windmill.get_shared_state(path=path)

Get the state in the shared folder using pickle

@init_global_client
def get_variable(path: str) -> str:
1299@init_global_client
1300def get_variable(path: str) -> str:
1301    """
1302    Returns the variable at a given path as a string
1303    """
1304    return _client.get_variable(path)

Returns the variable at a given path as a string

@init_global_client
def set_variable(path: str, value: str, is_secret: bool = False) -> None:
1307@init_global_client
1308def set_variable(path: str, value: str, is_secret: bool = False) -> None:
1309    """
1310    Set the variable at a given path as a string, creating it if it does not exist
1311    """
1312    return _client.set_variable(path, value, is_secret)

Set the variable at a given path as a string, creating it if it does not exist

@init_global_client
def get_flow_user_state(key: str) -> Any:
1315@init_global_client
1316def get_flow_user_state(key: str) -> Any:
1317    """
1318    Get the user state of a flow at a given key
1319    """
1320    return _client.get_flow_user_state(key)

Get the user state of a flow at a given key

@init_global_client
def set_flow_user_state(key: str, value: Any) -> None:
1323@init_global_client
1324def set_flow_user_state(key: str, value: Any) -> None:
1325    """
1326    Set the user state of a flow at a given key
1327    """
1328    return _client.set_flow_user_state(key, value)

Set the user state of a flow at a given key

@init_global_client
def get_state_path() -> str:
1331@init_global_client
1332def get_state_path() -> str:
1333    return _client.state_path
@init_global_client
def get_resume_urls(approver: str = None) -> dict:
1336@init_global_client
1337def get_resume_urls(approver: str = None) -> dict:
1338    return _client.get_resume_urls(approver)
@init_global_client
def request_interactive_slack_approval( slack_resource_path: str, channel_id: str, message: str = None, approver: str = None, default_args_json: dict = None, dynamic_enums_json: dict = None) -> None:
1341@init_global_client
1342def request_interactive_slack_approval(
1343    slack_resource_path: str,
1344    channel_id: str,
1345    message: str = None,
1346    approver: str = None,
1347    default_args_json: dict = None,
1348    dynamic_enums_json: dict = None,
1349) -> None:
1350    return _client.request_interactive_slack_approval(
1351        slack_resource_path=slack_resource_path,
1352        channel_id=channel_id,
1353        message=message,
1354        approver=approver,
1355        default_args_json=default_args_json,
1356        dynamic_enums_json=dynamic_enums_json,
1357    )
@init_global_client
def send_teams_message( conversation_id: str, text: str, success: bool, card_block: dict = None):
1360@init_global_client
1361def send_teams_message(
1362    conversation_id: str, text: str, success: bool, card_block: dict = None
1363):
1364    return _client.send_teams_message(conversation_id, text, success, card_block)
@init_global_client
def cancel_running() -> dict:
1367@init_global_client
1368def cancel_running() -> dict:
1369    """Cancel currently running executions of the same script."""
1370    return _client.cancel_running()

Cancel currently running executions of the same script.

@init_global_client
def run_script( path: str = None, hash_: str = None, args: dict = None, timeout: datetime.timedelta | int | float = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = True) -> Any:
1373@init_global_client
1374def run_script(
1375    path: str = None,
1376    hash_: str = None,
1377    args: dict = None,
1378    timeout: dt.timedelta | int | float = None,
1379    verbose: bool = False,
1380    cleanup: bool = True,
1381    assert_result_is_not_none: bool = True,
1382) -> Any:
1383    """Run script synchronously and return its result.
1384    
1385    .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
1386    """
1387    return _client.run_script(
1388        path=path,
1389        hash_=hash_,
1390        args=args,
1391        verbose=verbose,
1392        assert_result_is_not_none=assert_result_is_not_none,
1393        cleanup=cleanup,
1394        timeout=timeout,
1395    )

Run script synchronously and return its result.

Deprecated since version Use run_script_by_path or run_script_by_hash instead..

@init_global_client
def run_script_by_path( path: str, args: dict = None, timeout: datetime.timedelta | int | float = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = True) -> Any:
1398@init_global_client
1399def run_script_by_path(
1400    path: str,
1401    args: dict = None,
1402    timeout: dt.timedelta | int | float = None,
1403    verbose: bool = False,
1404    cleanup: bool = True,
1405    assert_result_is_not_none: bool = True,
1406) -> Any:
1407    """Run script by path synchronously and return its result."""
1408    return _client.run_script_by_path(
1409        path=path,
1410        args=args,
1411        verbose=verbose,
1412        assert_result_is_not_none=assert_result_is_not_none,
1413        cleanup=cleanup,
1414        timeout=timeout,
1415    )

Run script by path synchronously and return its result.

@init_global_client
def run_script_by_hash( hash_: str, args: dict = None, timeout: datetime.timedelta | int | float = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = True) -> Any:
1418@init_global_client
1419def run_script_by_hash(
1420    hash_: str,
1421    args: dict = None,
1422    timeout: dt.timedelta | int | float = None,
1423    verbose: bool = False,
1424    cleanup: bool = True,
1425    assert_result_is_not_none: bool = True,
1426) -> Any:
1427    """Run script by hash synchronously and return its result."""
1428    return _client.run_script_by_hash(
1429        hash_=hash_,
1430        args=args,
1431        verbose=verbose,
1432        assert_result_is_not_none=assert_result_is_not_none,
1433        cleanup=cleanup,
1434        timeout=timeout,
1435    )

Run script by hash synchronously and return its result.

@init_global_client
def username_to_email(username: str) -> str:
1438@init_global_client
1439def username_to_email(username: str) -> str:
1440    """
1441    Get email from workspace username
1442    This method is particularly useful for apps that require the email address of the viewer.
1443    Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
1444    """
1445    return _client.username_to_email(username)

Get email from workspace username This method is particularly useful for apps that require the email address of the viewer. Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.

def task(*args, **kwargs):
1448def task(*args, **kwargs):
1449    from inspect import signature
1450
1451    def f(func, tag: str | None = None):
1452        if (
1453            os.environ.get("WM_JOB_ID") is None
1454            or os.environ.get("MAIN_OVERRIDE") == func.__name__
1455        ):
1456
1457            def inner(*args, **kwargs):
1458                return func(*args, **kwargs)
1459
1460            return inner
1461        else:
1462
1463            def inner(*args, **kwargs):
1464                global _client
1465                if _client is None:
1466                    _client = Windmill()
1467                w_id = os.environ.get("WM_WORKSPACE")
1468                job_id = os.environ.get("WM_JOB_ID")
1469                f_name = func.__name__
1470                json = kwargs
1471                params = list(signature(func).parameters)
1472                for i, arg in enumerate(args):
1473                    if i < len(params):
1474                        p = params[i]
1475                        key = p
1476                        if key not in kwargs:
1477                            json[key] = arg
1478
1479                params = {}
1480                if tag is not None:
1481                    params["tag"] = tag
1482                w_as_code_response = _client.post(
1483                    f"/w/{w_id}/jobs/run/workflow_as_code/{job_id}/{f_name}",
1484                    json={"args": json},
1485                    params=params,
1486                )
1487                job_id = w_as_code_response.text
1488                print(f"Executing task {func.__name__} on job {job_id}")
1489                job_result = _client.wait_job(job_id)
1490                print(f"Task {func.__name__} ({job_id}) completed")
1491                return job_result
1492
1493            return inner
1494
1495    if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
1496        return f(args[0], None)
1497    else:
1498        return lambda x: f(x, kwargs.get("tag"))
def parse_resource_syntax(s: str) -> Optional[str]:
1500def parse_resource_syntax(s: str) -> Optional[str]:
1501    """Parse resource syntax from string."""
1502    if s is None:
1503        return None
1504    if s.startswith("$res:"):
1505        return s[5:]
1506    if s.startswith("res://"):
1507        return s[6:]
1508    return None

Parse resource syntax from string.

def parse_s3_object(s3_object: wmill.s3_types.S3Object | str) -> wmill.s3_types.S3Object:
1510def parse_s3_object(s3_object: S3Object | str) -> S3Object:
1511    """Parse S3 object from string or S3Object format."""
1512    if isinstance(s3_object, str):
1513        match = re.match(r'^s3://([^/]*)/(.*)$', s3_object)
1514        if match:
1515            return S3Object(s3=match.group(2) or "", storage=match.group(1) or None)
1516        return S3Object(s3="")
1517    else:
1518        return s3_object

Parse S3 object from string or S3Object format.

def parse_variable_syntax(s: str) -> Optional[str]:
1522def parse_variable_syntax(s: str) -> Optional[str]:
1523    """Parse variable syntax from string."""
1524    if s.startswith("var://"):
1525        return s[6:]
1526    return None

Parse variable syntax from string.

def append_to_result_stream(text: str) -> None:
1529def append_to_result_stream(text: str) -> None:
1530    """Append a text to the result stream.
1531    
1532    Args:
1533        text: text to append to the result stream
1534    """
1535    print("WM_STREAM: {}".format(text.replace(chr(10), '\\n')))

Append a text to the result stream.

Args: text: text to append to the result stream

def stream_result(stream) -> None:
1537def stream_result(stream) -> None:
1538    """Stream to the result stream.
1539    
1540    Args:
1541        stream: stream to stream to the result stream
1542    """
1543    for text in stream:
1544        append_to_result_stream(text)

Stream to the result stream.

Args: stream: stream to stream to the result stream