Skip to main content

EMR Pipes Client

class ascii_library.orchestration.pipes.emr._PipesEmrClient

Bases: _PipesBaseCloudClient

Pipes client for EMR.

Args: : emr_job_runner (EmrJobRunner): An instance of EmrJobRunner. env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the EMR job. context_injector (Optional[PipesContextInjector]): A context injector to use to inject context into the EMR process. message_reader (Optional[PipesMessageReader]): A message reader to use to read messages from the EMR job. poll_interval_seconds (float): How long to sleep between checking the status of the job run.

__init__(emr_client, s3_client, price_client, bucket, context_injector=None, message_reader=None)

  • Parameters:
    • bucket (str)
    • context_injector (PipesContextInjector | None)
    • message_reader (PipesMessageReader | None)

write_block(content, s)

  • Parameters: s (str)

create_bootstrap_script(output_file='bootstrap.sh', bucket='ascii-supply-chain-research-pipeline', libraries=None)

  • Parameters:
    • output_file (str)
    • bucket (str)
    • libraries (List *[*LibraryConfig ] | None)

handle_pypi(content, lib)

handle_wheel(bucket, content, lib)

modify_env_var(cluster_config, key, value)

  • Parameters:
    • cluster_config (dict)
    • key (str)
    • value (str)

extract_filename_without_extension(path)

  • Parameters: path (str)

prepare_emr_job(local_file_path, bucket, s3_path, emr_job_config, step_config, libraries_to_build_and_upload=None, libraries=None, extras=None)

  • Parameters:
    • local_file_path (str)
    • bucket (str)
    • s3_path (str)
    • emr_job_config (Dict *[*str , Any ])
    • libraries_to_build_and_upload (List *[*str ] | None)
    • libraries (List *[*LibraryConfig ] | None)
    • extras (Mapping *[*str , Any ] | None)
  • Return type: Tuple[Mapping[str, Any] | None, Dict[str, Any]]

adjust_emr_job_config(emr_job_config, fleet_config)

  • Parameters:
  • Return type: dict

submit_emr_job(bootstrap_env, emr_job_config, step_config, extras)

  • Parameters:
    • emr_job_config (dict)
    • extras (Mapping *[*str , Any ])
  • Return type: str

run(context, emr_job_config, step_config, local_file_path, bucket, s3_path, libraries_to_build_and_upload=None, libraries=None, extras=None, fleet_config=None)

Synchronously execute an EMR job with the pipes protocol.

  • Parameters:
    • context (OpExecutionContext)
    • emr_job_config (dict)
    • local_file_path (str)
    • bucket (str)
    • s3_path (str)
    • libraries_to_build_and_upload (List *[*str ] | None)
    • libraries (List *[*LibraryConfig ] | None)
    • extras (Mapping *[*str , Any ] | None)
    • fleet_config (CloudInstanceConfig | None)
  • Return type: PipesClientCompletedInvocation