Skip to main content

ascii_library.orchestration.pipes.spark_pipes_factory module

ascii_library.orchestration.pipes.spark_pipes_factory.get_libs_dict(cfg)

  • Parameters: cfg (List *[*LibraryConfig ] | None)
  • Return type: List[Mapping[str, str | Mapping]]

ascii_library.orchestration.pipes.spark_pipes_factory.update_spot_bid_price_percent(fleet_config, new_spot_bid_price_percent)

ascii_library.orchestration.pipes.spark_pipes_factory.generate_uploaded_script_paths(local_input_path, prefix='dbfs:/')

Retrieve file name from path. construct full path as: “dbfs:/external_pipes/<<filename>>.py”

  • Parameters:
    • local_input_path (str)
    • prefix (str)
  • Return type: str

ascii_library.orchestration.pipes.spark_pipes_factory.get_engine_from_config(config, spark_pipes_client)

ascii_library.orchestration.pipes.spark_pipes_factory.spark_pipes_asset_factory(name, key_prefix, spark_pipes_client, external_script_file, partitions_def=None, cfg=None, deps=None, group_name=None, local_spark_config=None, libraries_to_build_and_upload=None, databricks_cluster_config=None, libraries_config=None, emr_additional_libraries=None, dbr_additional_libraries=None, emr_job_config=None, fleet_filters=None)

Construct dagster-pipes based Spark assets for multiple engines: local pyspark and databricks

Automatically configure the right pipes clients. In the case of Databricks: Ensure dependencies and scripts are present and ready to be executed - automatically build the dependent libraries and upload these to DBFS.

  • Parameters:
    • name (str)
    • key_prefix (Sequence *[*str ])
    • spark_pipes_client (SparkPipesResource)
    • external_script_file (str)
    • partitions_def (PartitionsDefinition | None)
    • deps (Sequence *[*AssetsDefinition ] | None)
    • group_name (str | None)
    • local_spark_config (Mapping *[*str , str ] | None)
    • libraries_to_build_and_upload (Sequence *[*str ])
    • databricks_cluster_config (dict *[*str , Any ] | None)
    • libraries_config (List *[*LibraryConfig ] | None)
    • emr_additional_libraries (List *[*LibraryConfig ] | None)
    • dbr_additional_libraries (List *[*LibraryConfig ] | None)
    • emr_job_config (dict | None)
    • fleet_filters (CloudInstanceConfig | None)

class ascii_library.orchestration.pipes.spark_pipes_factory.BaseConfig

Bases: Config

Runtime knobs for Spark pipes.

Fields: : - spot_bid_price_percent: percent of on-demand price to pay for spot (1–100).

  • override_default_engine: override engine. One of pyspark, emr, databricks.

spot_bid_price_percent : int | None

override_default_engine : str | None

model_config : ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'frozen': True, 'ignored_types': (<class 'functools.cached_property'>,)}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].