Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import os 

2import json 

3import time 

4import subprocess 

5from urllib.parse import urlparse 

6import uuid 

7import tempfile 

8 

9from requests.auth import HTTPBasicAuth 

10import requests 

11 

12from .abstract_job_submitter import AbstractJobSubmitter 

13from spark_etl.utils import CLIHandler 

14from spark_etl.core import ClientChannelInterface 

15from spark_etl.exceptions import SparkETLLaunchFailure 

16 

17class ClientChannel(ClientChannelInterface): 

18 def __init__(self, bridge, stage_dir, run_dir, run_id): 

19 self.bridge = bridge # the bridge server's name which we can ssh to 

20 self.stage_dir = stage_dir # stage_dir is on bridge 

21 self.run_dir = run_dir # base dir for all runs, e.g. hdfs:///beta/etl/runs 

22 self.run_id = run_id # string, unique id of the run 

23 

24 

25 def _create_stage_dir(self): 

26 # We are using stage_dir instead of temp dir to stage objects on bridge 

27 # the reason is, many cases temp dir is in memory and cannot hold large object 

28 subprocess.check_call( 

29 [ 

30 "ssh", "-q", self.bridge, 

31 "mkdir", "-p", os.path.join(self.stage_dir, self.run_id) 

32 ], 

33 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 

34 ) 

35 

36 

37 def read_json(self, name): 

38 """read a json object from HDFS run dir with specific name 

39 """ 

40 self._create_stage_dir() 

41 

42 # copy to stage dir on bridge first 

43 subprocess.check_call( 

44 [ 

45 "ssh", "-q", self.bridge, 

46 "hdfs", "dfs", "-copyToLocal", "-f", 

47 os.path.join(self.run_dir, self.run_id, name), 

48 os.path.join(self.stage_dir, self.run_id, name) 

49 ], 

50 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 

51 ) 

52 

53 try: 

54 stage_file = tempfile.NamedTemporaryFile(mode="w", delete=False) 

55 stage_file.close() 

56 

57 subprocess.check_call( 

58 [ 

59 "scp", "-q", 

60 os.path.join( 

61 f"{self.bridge}:{self.stage_dir}", self.run_id, name 

62 ), 

63 stage_file.name 

64 ], 

65 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 

66 ) 

67 

68 with open(stage_file.name) as f: 

69 return json.load(f) 

70 finally: 

71 os.remove(stage_file.name) 

72 

73 

74 def has_json(self, name): 

75 """check if a file with specific name exist or not in the HDFS run dir 

76 """ 

77 exit_code = subprocess.call( 

78 [ 

79 "ssh", "-q", self.bridge, 

80 "hdfs", "dfs", "-test", "-f", 

81 os.path.join(self.run_dir, self.run_id, name) 

82 ], 

83 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 

84 ) 

85 if exit_code == 0: 

86 return True 

87 if exit_code == 1: 

88 return False 

89 raise Exception(f"Unrecognized exit code: {exit_code}") 

90 

91 

92 def write_json(self, name, payload): 

93 """write a json object to HDFS run dir with specific name 

94 """ 

95 self._create_stage_dir() 

96 

97 stage_file = tempfile.NamedTemporaryFile(mode="w", delete=False) 

98 stage_file.close() 

99 try: 

100 with open(stage_file.name, "wt") as f: 

101 json.dump(payload, f) 

102 

103 # copy over to bridge at stage directory 

104 subprocess.check_call( 

105 [ 

106 "scp", "-q", 

107 stage_file.name, 

108 os.path.join( 

109 f"{self.bridge}:{self.stage_dir}", self.run_id, name 

110 ) 

111 ], 

112 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 

113 ) 

114 

115 # then upload to HDFS, -f for overwrite if exist 

116 subprocess.check_call( 

117 [ 

118 "ssh", "-q", self.bridge, 

119 "hdfs", "dfs", "-copyFromLocal", "-f", 

120 os.path.join(self.stage_dir, self.run_id, name), 

121 os.path.join(self.run_dir, self.run_id, name) 

122 ], 

123 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 

124 ) 

125 finally: 

126 os.remove(stage_file.name) 

127 

128 

129 def delete_json(self, name): 

130 subprocess.check_call( 

131 [ 

132 "ssh", "-q", self.bridge, 

133 "hdfs", "dfs", "-rm", 

134 os.path.join(self.run_dir, self.run_id, name) 

135 ], 

136 stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 

137 ) 

138 

139 

140class LivyJobSubmitter(AbstractJobSubmitter): 

141 def __init__(self, config): 

142 super(LivyJobSubmitter, self).__init__(config) 

143 

144 # options is vendor specific arguments 

145 # args.conf is the spark job config 

146 # args is arguments need to pass to the main entry 

147 def run(self, deployment_location, options={}, args={}, handlers=[], on_job_submitted=None, cli_mode=False): 

148 o = urlparse(deployment_location) 

149 if o.scheme not in ('hdfs', 's3'): 

150 raise SparkETLLaunchFailure("deployment_location must be in hdfs or s3") 

151 

152 bridge = self.config["bridge"] 

153 stage_dir = self.config['stage_dir'] 

154 run_dir = self.config['run_dir'] 

155 

156 # create run dir 

157 run_id = str(uuid.uuid4()) 

158 subprocess.check_call([ 

159 "ssh", "-q", bridge, 

160 "hdfs", "dfs", "-mkdir", "-p", 

161 os.path.join(run_dir, run_id) 

162 ]) 

163 

164 client_channel = ClientChannel(bridge, stage_dir, run_dir, run_id) 

165 client_channel.write_json("input.json", args) 

166 

167 

168 headers = { 

169 "Content-Type": "application/json", 

170 "X-Requested-By": "root", 

171 'proxyUser': 'root' 

172 } 

173 

174 config = { 

175 'file': os.path.join(deployment_location, "job_loader.py"), 

176 'pyFiles': [ os.path.join(deployment_location, "app.zip") ], 

177 'args': [ 

178 '--run-id', run_id, 

179 '--run-dir', os.path.join(run_dir, run_id), 

180 '--lib-zip', os.path.join(deployment_location, "lib.zip") 

181 ] 

182 } 

183 

184 config.update(options) 

185 config.pop("display_name", None) # livy job submitter does not support display_name 

186 

187 service_url = self.config['service_url'] 

188 username = self.config.get('username') 

189 password = self.config.get('password') 

190 

191 # print(json.dumps(config)) 

192 if username is None: 

193 r = requests.post( 

194 os.path.join(service_url, "batches"), 

195 data=json.dumps(config), 

196 headers=headers, 

197 verify=False 

198 ) 

199 else: 

200 r = requests.post( 

201 os.path.join(service_url, "batches"), 

202 data=json.dumps(config), 

203 headers=headers, 

204 auth=HTTPBasicAuth(username, password), 

205 verify=False 

206 ) 

207 if r.status_code not in [200, 201]: 

208 msg = "Failed to submit the job, status: {}, error message: \"{}\"".format( 

209 r.status_code, 

210 r.content 

211 ) 

212 raise SparkETLLaunchFailure(msg) 

213 

214 print(f'job submitted, run_id = {run_id}') 

215 ret = json.loads(r.content.decode("utf8")) 

216 if on_job_submitted is not None: 

217 on_job_submitted(run_id, vendor_info=ret) 

218 job_id = ret['id'] 

219 print('job id: {}'.format(job_id)) 

220 print(ret) 

221 print('logs:') 

222 for log in ret.get('log', []): 

223 print(log) 

224 

225 def get_job_status(): 

226 r = requests.get( 

227 os.path.join(service_url, "batches", str(job_id)), 

228 headers=headers, 

229 auth=HTTPBasicAuth(username, password), 

230 verify=False 

231 ) 

232 if r.status_code != 200: 

233 msg = "Failed to get job status, status: {}, error message: \"{}\"".format( 

234 r.status_code, 

235 r.content 

236 ) 

237 # this is considered unhandled 

238 raise Exception(msg) 

239 ret = json.loads(r.content.decode("utf8")) 

240 return ret 

241 

242 cli_entered = False 

243 # pull the job status 

244 while True: 

245 ret = get_job_status() 

246 job_state = ret['state'] 

247 appId = ret.get('appId') 

248 print('job_state: {}, applicationId: {}'.format( 

249 job_state, appId 

250 

251 )) 

252 if job_state in ['success', 'dead', 'killed']: 

253 # cmd = f"yarn logs -applicationId {appId}" 

254 # host = self.config['bridge'] 

255 # subprocess.call(["ssh", "-q", "-t", host, cmd], shell=False) 

256 print(f"job_state={job_state}") 

257 if job_state == 'success': 

258 return client_channel.read_json("result.json") 

259 raise SparkETLLaunchFailure("Job failed") 

260 time.sleep(5) 

261 

262 # we enter the cli mode upon first reached the running status 

263 if cli_mode and not cli_entered and job_state == 'running': 

264 cli_entered = True 

265 cli_handler = CLIHandler( 

266 client_channel, 

267 lambda : get_job_status()['state'] not in ('success', 'dead', 'killed'), 

268 handlers 

269 ) 

270 cli_handler.loop() 

271 

272