Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import io 

2import subprocess 

3import time 

4from contextlib import redirect_stdout, redirect_stderr 

5import code 

6from datetime import datetime 

7import readline 

8import traceback 

9 

10from termcolor import colored, cprint 

11 

12CLI_REQUEST_NAME = "cli-request.json" 

13CLI_RESPONSE_NAME = "cli-response.json" 

14 

15def handle_pwd(spark, user_input, channel): 

16 channel.write_json( 

17 spark, 

18 CLI_RESPONSE_NAME, 

19 { 

20 "status": "ok", 

21 "output": os.getcwd() 

22 } 

23 ) 

24 

25def handle_bash(spark, user_input, channel): 

26 cmd_buffer = '\n'.join(user_input['lines']) 

27 f = io.StringIO() 

28 with redirect_stdout(f): 

29 p = subprocess.run(cmd_buffer, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 

30 

31 channel.write_json( 

32 spark, 

33 CLI_RESPONSE_NAME, 

34 { 

35 "status": "ok", 

36 "exit_code": p.returncode, 

37 "output": p.stdout.decode('utf-8'), 

38 } 

39 ) 

40 

41def handle_python(spark, user_input, console, channel): 

42 source = '\n'.join(user_input['lines']) 

43 stdout_f = io.StringIO() 

44 stderr_f = io.StringIO() 

45 with redirect_stdout(stdout_f): 

46 with redirect_stderr(stderr_f): 

47 console.runsource(source, symbol="exec") 

48 

49 channel.write_json( 

50 spark, 

51 CLI_RESPONSE_NAME, 

52 { 

53 "status": "ok", 

54 "output": stdout_f.getvalue() + "\n" + stderr_f.getvalue() , 

55 } 

56 ) 

57 

58class PySparkConsole(code.InteractiveInterpreter): 

59 def __init__(self, locals=None): 

60 super(PySparkConsole, self).__init__(locals=locals) 

61 

62 

63class CLIHandler: 

64 def __init__(self, client_channel, is_job_active, handlers): 

65 self.client_channel = client_channel 

66 if is_job_active is None: 

67 self.is_job_active = lambda : True 

68 else: 

69 self.is_job_active = is_job_active 

70 

71 self.handlers = handlers 

72 self.last_job_ok_time = None 

73 

74 

75 def loop(self): 

76 # line_mode can be "bash", "python" or "OFF" 

77 # When line_mode is OFF, you need send explicitly run @@bash or @@python 

78 # to submit a block of code to server 

79 # When line_mode is bash, each line is a bash script 

80 # when line_mode is python, each line is a python script 

81 line_mode = "off" 

82 

83 # if is_waiting_for_response is True, we need to pull server for cli-response.json 

84 # if is_waiting_for_response is False, we are free to enter new command 

85 is_waiting_for_response = True 

86 # command line buffer 

87 cli_lines = [] 

88 cli_wait_prompt = "-/|\\" 

89 cli_wait_prompt_idx = 0 

90 log_filename = None 

91 # commands 

92 # @@log -- all the output will be written to the log file as well 

93 # by default there is not log 

94 # @@nolog -- turn off log 

95 # @@clear -- clear command line buffer 

96 # @@load -- load a script from local file and append to command buffer 

97 # @@bash -- submit a bash script 

98 # @@python -- submit a python script 

99 # @@show -- show the command buffer 

100 # @@pwd -- show driver's current directory 

101 # @@quit -- quit the cli console 

102 

103 command = None 

104 while True: 

105 handle_server_ask(self.client_channel, self.handlers) 

106 if not is_waiting_for_response: 

107 if line_mode == "off": 

108 prompt = "> " 

109 elif line_mode == "bash": 

110 prompt = "bash> " 

111 else: 

112 prompt = "python> " 

113 

114 command = input(prompt) 

115 

116 if command == "@@quit": 

117 self.client_channel.write_json(CLI_REQUEST_NAME, {"type": "@@quit"}) 

118 is_waiting_for_response = True 

119 continue 

120 

121 if command == "@@pwd": 

122 self.client_channel.write_json(CLI_REQUEST_NAME, {"type": "@@pwd"}) 

123 is_waiting_for_response = True 

124 continue 

125 

126 if command == "@@bash": 

127 self.client_channel.write_json( 

128 CLI_REQUEST_NAME, 

129 { 

130 "type": "@@bash", 

131 "lines": cli_lines 

132 } 

133 ) 

134 is_waiting_for_response = True 

135 cli_lines = [] 

136 continue 

137 

138 if command == "@@python": 

139 self.client_channel.write_json( 

140 CLI_REQUEST_NAME, 

141 { 

142 "type": "@@python", 

143 "lines": cli_lines 

144 } 

145 ) 

146 is_waiting_for_response = True 

147 cli_lines = [] 

148 continue 

149 

150 if command.startswith("@@mode"): 

151 cmds = command.split(" ") 

152 if len(cmds) != 2 or cmds[1] not in ("off", "bash", "python"): 

153 print("Usage:") 

154 print("@@mode off") 

155 print("@@mode python") 

156 print("@@mode bash") 

157 else: 

158 line_mode = cmds[1] 

159 continue 

160 

161 if command.startswith("@@log"): 

162 cmds = command.split(" ") 

163 if len(cmds) != 2: 

164 print("Usage:") 

165 print("@@log <filename>") 

166 else: 

167 log_filename = cmds[1] 

168 continue 

169 

170 if command.startswith("@@load"): 

171 cmds = command.split(" ") 

172 if len(cmds) != 2: 

173 print("Usage:") 

174 print("@@load <filename>") 

175 else: 

176 try: 

177 with open(cmds[1], "rt") as load_f: 

178 for line in load_f: 

179 cli_lines.append(line.rstrip()) 

180 except Exception as e: 

181 print(f"Unable to read from file: {str(e)}") 

182 continue 

183 

184 if command == "@@clear": 

185 cli_lines = [] 

186 continue 

187 

188 if command == "@@show": 

189 for line in cli_lines: 

190 print(line) 

191 print() 

192 continue 

193 

194 # for any other command, we will append to the cli buffer 

195 if line_mode == "off": 

196 cli_lines.append(command) 

197 else: 

198 self.client_channel_write_json( 

199 CLI_REQUEST_NAME, 

200 { 

201 "type": "@@" + line_mode, 

202 "lines": [ command ] 

203 } 

204 ) 

205 is_waiting_for_response = True 

206 else: 

207 if self.client_channel.has_json(CLI_RESPONSE_NAME): 

208 response = self.client_channel.read_json(CLI_RESPONSE_NAME) 

209 self.client_channel.delete_json(CLI_RESPONSE_NAME) 

210 # print('#################################################') 

211 # print('# Response #') 

212 # print(f"# status : {response['status']}") 

213 # if 'exit_code' in response: 

214 # print(f"# exit_code: {response['exit_code']}") 

215 # print('#################################################') 

216 cprint(response['output'], 'green', 'on_red') 

217 if log_filename is not None: 

218 try: 

219 with open(log_filename, "a+t") as log_f: 

220 print(response['output'], file=log_f) 

221 print("", file=log_f) 

222 except Exception as e: 

223 print(f"Unable to write to file {log_filename}: {str(e)}") 

224 

225 if command == "@@quit": 

226 break 

227 

228 if self.is_job_active(): 

229 self.last_job_ok_time = datetime.utcnow() 

230 is_waiting_for_response = False 

231 else: 

232 print("Job quit unexpectedly!") 

233 break 

234 else: 

235 time.sleep(1) # do not sleep too long since this is an interactive session 

236 now = datetime.utcnow() 

237 if self.last_job_ok_time is None or (now - self.last_job_ok_time).total_seconds() >= 10: 

238 if self.is_job_active(): 

239 self.last_job_ok_time = now 

240 else: 

241 print("Job quit unexpectedly!") 

242 break 

243 

244 print(f"\r{cli_wait_prompt[cli_wait_prompt_idx]}\r", end="") 

245 cli_wait_prompt_idx = (cli_wait_prompt_idx + 1) % 4 

246 

247 

248def cli_main(spark, args, sysops={}): 

249 channel = sysops['channel'] 

250 console = PySparkConsole(locals={'spark': spark, 'sysops': sysops}) 

251 

252 channel.write_json( 

253 spark, 

254 CLI_RESPONSE_NAME, 

255 { 

256 "status": "ok", 

257 "output": "Welcome to OCI Spark-CLI Interface", 

258 } 

259 ) 

260 

261 while True: 

262 if not channel.has_json(spark, CLI_REQUEST_NAME): 

263 time.sleep(1) 

264 continue 

265 

266 user_input = channel.read_json(spark, CLI_REQUEST_NAME) 

267 channel.delete_json(spark, CLI_REQUEST_NAME) 

268 

269 if user_input["type"] == "@@quit": 

270 channel.write_json( 

271 spark, 

272 CLI_RESPONSE_NAME, 

273 { 

274 "status": "ok", 

275 "output": "Server quit gracefully", 

276 } 

277 ) 

278 break 

279 if user_input["type"] == "@@pwd": 

280 handle_pwd(spark, user_input, channel) 

281 continue 

282 if user_input["type"] == "@@bash": 

283 handle_bash(spark, user_input, channel) 

284 continue 

285 if user_input["type"] == "@@python": 

286 handle_python(spark, user_input, console, channel) 

287 continue 

288 return {"status": "ok"} 

289 

290SERVER_TO_CLIENT_REQUEST = "stc-request.json" 

291SERVER_TO_CLIENT_RESPONSE = "stc-response.json" 

292 

293class RemoteCallException(Exception): 

294 def __init__(self, status, exception_msg=None, exception_name=None): 

295 if status == 'nohandler': 

296 msg = 'remote call failed: no handler' 

297 elif status == 'exception': 

298 msg = f'remote call failed: exception, name="{exception_name}", msg="{exception_msg}"' 

299 else: 

300 msg =f'remote call failed: something is wrong' 

301 

302 super(RemoteCallException, self).__init__(msg) 

303 self.msg = msg 

304 self.status = status 

305 self.exception_msg = exception_msg 

306 self.exception_name = exception_name 

307 

308 

309def server_ask_client(spark, channel, content, timeout=600, check_interval=5): 

310 """Called by server, ask a question and wait for answer 

311 """ 

312 channel.write_json(spark, SERVER_TO_CLIENT_REQUEST, content) 

313 start_time = datetime.utcnow() 

314 while True: 

315 if channel.has_json(spark, SERVER_TO_CLIENT_RESPONSE): 

316 response = channel.read_json(spark, SERVER_TO_CLIENT_RESPONSE) 

317 channel.delete_json(spark, SERVER_TO_CLIENT_RESPONSE) 

318 if response['status'] == 'ok': 

319 return response['answer'] 

320 elif response['status'] == 'nohandler': 

321 raise RemoteCallException("nohandler") 

322 elif response['status'] == 'exception': 

323 exception_name = response.get('exception_name') 

324 exception_msg = response.get('exception_msg') 

325 raise RemoteCallException( 

326 "exception", 

327 exception_name = response.get('exception_name'), 

328 exception_msg = response.get('exception_msg') 

329 ) 

330 else: 

331 raise RemoteCallException(None) 

332 

333 

334 if (datetime.utcnow() - start_time).total_seconds() >= timeout: 

335 raise Exception("Ask is not answered: timed out") 

336 else: 

337 time.sleep(check_interval) 

338 

339 

340def handle_server_ask(channel, handlers): 

341 """Called by job submitter to answer ask from server with handlers 

342 """ 

343 if handlers is None: 

344 return 

345 

346 if not channel.has_json(SERVER_TO_CLIENT_REQUEST): 

347 return 

348 

349 content = channel.read_json(SERVER_TO_CLIENT_REQUEST) 

350 channel.delete_json(SERVER_TO_CLIENT_REQUEST) 

351 print(f"handle_server_ask: topic={content.get('topic')}") 

352 

353 for handler in handlers: 

354 try: 

355 handled, out = handler(content) 

356 if handled: 

357 channel.write_json( 

358 SERVER_TO_CLIENT_RESPONSE, 

359 { 

360 'status': 'ok', 

361 'answer': out 

362 } 

363 ) 

364 return 

365 except Exception as e: 

366 traceback.print_exc() 

367 channel.write_json( 

368 SERVER_TO_CLIENT_RESPONSE, 

369 { 

370 'status': 'exception', 

371 "exception_name": e.__class__.__name__, 

372 "exception_msg": str(e) 

373 } 

374 ) 

375 return 

376 

377 channel.write_json( 

378 SERVER_TO_CLIENT_RESPONSE, 

379 { 

380 'status': 'nohandler' 

381 } 

382 ) 

383 return 

384