Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import os
2import json
3import time
4import subprocess
5from urllib.parse import urlparse
6import uuid
7import tempfile
9from requests.auth import HTTPBasicAuth
10import requests
12from .abstract_job_submitter import AbstractJobSubmitter
13from spark_etl.utils import CLIHandler
14from spark_etl.core import ClientChannelInterface
15from spark_etl.exceptions import SparkETLLaunchFailure
17class ClientChannel(ClientChannelInterface):
18 def __init__(self, bridge, stage_dir, run_dir, run_id):
19 self.bridge = bridge # the bridge server's name which we can ssh to
20 self.stage_dir = stage_dir # stage_dir is on bridge
21 self.run_dir = run_dir # base dir for all runs, e.g. hdfs:///beta/etl/runs
22 self.run_id = run_id # string, unique id of the run
25 def _create_stage_dir(self):
26 # We are using stage_dir instead of temp dir to stage objects on bridge
27 # the reason is, many cases temp dir is in memory and cannot hold large object
28 subprocess.check_call(
29 [
30 "ssh", "-q", self.bridge,
31 "mkdir", "-p", os.path.join(self.stage_dir, self.run_id)
32 ],
33 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
34 )
37 def read_json(self, name):
38 """read a json object from HDFS run dir with specific name
39 """
40 self._create_stage_dir()
42 # copy to stage dir on bridge first
43 subprocess.check_call(
44 [
45 "ssh", "-q", self.bridge,
46 "hdfs", "dfs", "-copyToLocal", "-f",
47 os.path.join(self.run_dir, self.run_id, name),
48 os.path.join(self.stage_dir, self.run_id, name)
49 ],
50 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
51 )
53 try:
54 stage_file = tempfile.NamedTemporaryFile(mode="w", delete=False)
55 stage_file.close()
57 subprocess.check_call(
58 [
59 "scp", "-q",
60 os.path.join(
61 f"{self.bridge}:{self.stage_dir}", self.run_id, name
62 ),
63 stage_file.name
64 ],
65 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
66 )
68 with open(stage_file.name) as f:
69 return json.load(f)
70 finally:
71 os.remove(stage_file.name)
74 def has_json(self, name):
75 """check if a file with specific name exist or not in the HDFS run dir
76 """
77 exit_code = subprocess.call(
78 [
79 "ssh", "-q", self.bridge,
80 "hdfs", "dfs", "-test", "-f",
81 os.path.join(self.run_dir, self.run_id, name)
82 ],
83 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
84 )
85 if exit_code == 0:
86 return True
87 if exit_code == 1:
88 return False
89 raise Exception(f"Unrecognized exit code: {exit_code}")
92 def write_json(self, name, payload):
93 """write a json object to HDFS run dir with specific name
94 """
95 self._create_stage_dir()
97 stage_file = tempfile.NamedTemporaryFile(mode="w", delete=False)
98 stage_file.close()
99 try:
100 with open(stage_file.name, "wt") as f:
101 json.dump(payload, f)
103 # copy over to bridge at stage directory
104 subprocess.check_call(
105 [
106 "scp", "-q",
107 stage_file.name,
108 os.path.join(
109 f"{self.bridge}:{self.stage_dir}", self.run_id, name
110 )
111 ],
112 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
113 )
115 # then upload to HDFS, -f for overwrite if exist
116 subprocess.check_call(
117 [
118 "ssh", "-q", self.bridge,
119 "hdfs", "dfs", "-copyFromLocal", "-f",
120 os.path.join(self.stage_dir, self.run_id, name),
121 os.path.join(self.run_dir, self.run_id, name)
122 ],
123 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
124 )
125 finally:
126 os.remove(stage_file.name)
129 def delete_json(self, name):
130 subprocess.check_call(
131 [
132 "ssh", "-q", self.bridge,
133 "hdfs", "dfs", "-rm",
134 os.path.join(self.run_dir, self.run_id, name)
135 ],
136 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
137 )
140class LivyJobSubmitter(AbstractJobSubmitter):
141 def __init__(self, config):
142 super(LivyJobSubmitter, self).__init__(config)
144 # options is vendor specific arguments
145 # args.conf is the spark job config
146 # args is arguments need to pass to the main entry
147 def run(self, deployment_location, options={}, args={}, handlers=[], on_job_submitted=None, cli_mode=False):
148 o = urlparse(deployment_location)
149 if o.scheme not in ('hdfs', 's3'):
150 raise SparkETLLaunchFailure("deployment_location must be in hdfs or s3")
152 bridge = self.config["bridge"]
153 stage_dir = self.config['stage_dir']
154 run_dir = self.config['run_dir']
156 # create run dir
157 run_id = str(uuid.uuid4())
158 subprocess.check_call([
159 "ssh", "-q", bridge,
160 "hdfs", "dfs", "-mkdir", "-p",
161 os.path.join(run_dir, run_id)
162 ])
164 client_channel = ClientChannel(bridge, stage_dir, run_dir, run_id)
165 client_channel.write_json("input.json", args)
168 headers = {
169 "Content-Type": "application/json",
170 "X-Requested-By": "root",
171 'proxyUser': 'root'
172 }
174 config = {
175 'file': os.path.join(deployment_location, "job_loader.py"),
176 'pyFiles': [ os.path.join(deployment_location, "app.zip") ],
177 'args': [
178 '--run-id', run_id,
179 '--run-dir', os.path.join(run_dir, run_id),
180 '--lib-zip', os.path.join(deployment_location, "lib.zip")
181 ]
182 }
184 config.update(options)
185 config.pop("display_name", None) # livy job submitter does not support display_name
187 service_url = self.config['service_url']
188 username = self.config.get('username')
189 password = self.config.get('password')
191 # print(json.dumps(config))
192 if username is None:
193 r = requests.post(
194 os.path.join(service_url, "batches"),
195 data=json.dumps(config),
196 headers=headers,
197 verify=False
198 )
199 else:
200 r = requests.post(
201 os.path.join(service_url, "batches"),
202 data=json.dumps(config),
203 headers=headers,
204 auth=HTTPBasicAuth(username, password),
205 verify=False
206 )
207 if r.status_code not in [200, 201]:
208 msg = "Failed to submit the job, status: {}, error message: \"{}\"".format(
209 r.status_code,
210 r.content
211 )
212 raise SparkETLLaunchFailure(msg)
214 print(f'job submitted, run_id = {run_id}')
215 ret = json.loads(r.content.decode("utf8"))
216 if on_job_submitted is not None:
217 on_job_submitted(run_id, vendor_info=ret)
218 job_id = ret['id']
219 print('job id: {}'.format(job_id))
220 print(ret)
221 print('logs:')
222 for log in ret.get('log', []):
223 print(log)
225 def get_job_status():
226 r = requests.get(
227 os.path.join(service_url, "batches", str(job_id)),
228 headers=headers,
229 auth=HTTPBasicAuth(username, password),
230 verify=False
231 )
232 if r.status_code != 200:
233 msg = "Failed to get job status, status: {}, error message: \"{}\"".format(
234 r.status_code,
235 r.content
236 )
237 # this is considered unhandled
238 raise Exception(msg)
239 ret = json.loads(r.content.decode("utf8"))
240 return ret
242 cli_entered = False
243 # pull the job status
244 while True:
245 ret = get_job_status()
246 job_state = ret['state']
247 appId = ret.get('appId')
248 print('job_state: {}, applicationId: {}'.format(
249 job_state, appId
251 ))
252 if job_state in ['success', 'dead', 'killed']:
253 # cmd = f"yarn logs -applicationId {appId}"
254 # host = self.config['bridge']
255 # subprocess.call(["ssh", "-q", "-t", host, cmd], shell=False)
256 print(f"job_state={job_state}")
257 if job_state == 'success':
258 return client_channel.read_json("result.json")
259 raise SparkETLLaunchFailure("Job failed")
260 time.sleep(5)
262 # we enter the cli mode upon first reached the running status
263 if cli_mode and not cli_entered and job_state == 'running':
264 cli_entered = True
265 cli_handler = CLIHandler(
266 client_channel,
267 lambda : get_job_status()['state'] not in ('success', 'dead', 'killed'),
268 handlers
269 )
270 cli_handler.loop()