Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import json
2import os
3import uuid
4import tempfile
5import subprocess
6import json
7import time
9from spark_etl.job_submitters import AbstractJobSubmitter
10from spark_etl.core import ClientChannelInterface
11from spark_etl.utils import CLIHandler
13class ClientChannel(ClientChannelInterface):
14 def __init__(self, run_dir, run_id):
15 self.run_dir = run_dir
16 self.run_id = run_id
18 def _get_json_path(self, name):
19 return os.path.join(self.run_dir, self.run_id, name)
21 def read_json(self, name):
22 with open(self._get_json_path(name), "r") as f:
23 return json.load(f)
25 def has_json(self, name):
26 return os.path.isfile(self._get_json_path(name))
28 def write_json(self, name, payload):
29 with open(self._get_json_path(name), "w") as f:
30 json.dump(payload, f)
32 def delete_json(self, name):
33 os.remove(self._get_json_path(name))
36class PySparkJobSubmitter(AbstractJobSubmitter):
37 def __init__(self, config):
38 super(PySparkJobSubmitter, self).__init__(config)
41 def run(self, deployment_location, options={}, args={}, handlers=[], on_job_submitted=None, cli_mode=False):
42 # version is already baked into deployment_location
43 # local submitter ignores handlers
44 run_id = str(uuid.uuid4())
45 run_dir = self.config['run_dir']
46 app_dir = deployment_location
48 os.makedirs(os.path.join(run_dir, run_id))
50 # generate input.json
51 with open(os.path.join(run_dir, run_id, 'input.json'), 'wt') as f:
52 json.dump(args, f)
54 client_channel = ClientChannel(run_dir, run_id)
55 p = subprocess.Popen([
56 "spark-submit",
57 os.path.join(deployment_location, "job_loader.py"),
58 "--run-id", run_id,
59 "--run-dir", run_dir,
60 "--app-dir", app_dir
61 ])
62 exit_code = None
63 cli_entered = False
65 if on_job_submitted is not None:
66 on_job_submitted(run_id, vendor_info={})
68 while True:
69 time.sleep(1)
70 exit_code = p.poll()
71 if exit_code is not None:
72 break
74 if cli_mode and not cli_entered:
75 cli_entered = True
76 cli_handler = CLIHandler(
77 client_channel,
78 lambda : p.poll() is None,
79 handlers
80 )
81 cli_handler.loop()
83 if exit_code != 0:
84 Exception(f"Job failed, exit_code = {exit_code}")
86 print("Job completed successfully")
87 with open(os.path.join(run_dir, run_id, "result.json"), "r") as f:
88 return json.load(f)