EMR Pipes Client
class ascii_library.orchestration.pipes.emr._PipesEmrClient
Bases: _PipesBaseCloudClient
Pipes client for EMR.
Args: : emr_job_runner (EmrJobRunner): An instance of EmrJobRunner. env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the EMR job. context_injector (Optional[PipesContextInjector]): A context injector to use to inject context into the EMR process. message_reader (Optional[PipesMessageReader]): A message reader to use to read messages from the EMR job. poll_interval_seconds (float): How long to sleep between checking the status of the job run.
__init__(emr_client, s3_client, price_client, bucket, context_injector=None, message_reader=None)
- Parameters:
- bucket (str)
- context_injector (PipesContextInjector | None)
- message_reader (PipesMessageReader | None)
write_block(content, s)
- Parameters: s (str)
create_bootstrap_script(output_file='bootstrap.sh', bucket='ascii-supply-chain-research-pipeline', libraries=None)
- Parameters:
- output_file (str)
- bucket (str)
- libraries (List *[*LibraryConfig ] | None)
handle_pypi(content, lib)
handle_wheel(bucket, content, lib)
modify_env_var(cluster_config, key, value)
- Parameters:
- cluster_config (dict)
- key (str)
- value (str)
extract_filename_without_extension(path)
- Parameters: path (str)
prepare_emr_job(local_file_path, bucket, s3_path, emr_job_config, step_config, libraries_to_build_and_upload=None, libraries=None, extras=None)
- Parameters:
- local_file_path (str)
- bucket (str)
- s3_path (str)
- emr_job_config (Dict *[*str , Any ])
- libraries_to_build_and_upload (List *[*str ] | None)
- libraries (List *[*LibraryConfig ] | None)
- extras (Mapping *[*str , Any ] | None)
- Return type: Tuple[Mapping[str, Any] | None, Dict[str, Any]]
adjust_emr_job_config(emr_job_config, fleet_config)
- Parameters:
- emr_job_config (dict)
- fleet_config (CloudInstanceConfig | None)
- Return type: dict
submit_emr_job(bootstrap_env, emr_job_config, step_config, extras)
- Parameters:
- emr_job_config (dict)
- extras (Mapping *[*str , Any ])
- Return type: str
run(context, emr_job_config, step_config, local_file_path, bucket, s3_path, libraries_to_build_and_upload=None, libraries=None, extras=None, fleet_config=None)
Synchronously execute an EMR job with the pipes protocol.
- Parameters:
- context (OpExecutionContext)
- emr_job_config (dict)
- local_file_path (str)
- bucket (str)
- s3_path (str)
- libraries_to_build_and_upload (List *[*str ] | None)
- libraries (List *[*LibraryConfig ] | None)
- extras (Mapping *[*str , Any ] | None)
- fleet_config (CloudInstanceConfig | None)
- Return type: PipesClientCompletedInvocation