Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2import subprocess
3import time
4from contextlib import redirect_stdout, redirect_stderr
5import code
6from datetime import datetime
7import readline
8import traceback
10from termcolor import colored, cprint
12CLI_REQUEST_NAME = "cli-request.json"
13CLI_RESPONSE_NAME = "cli-response.json"
15def handle_pwd(spark, user_input, channel):
16 channel.write_json(
17 spark,
18 CLI_RESPONSE_NAME,
19 {
20 "status": "ok",
21 "output": os.getcwd()
22 }
23 )
25def handle_bash(spark, user_input, channel):
26 cmd_buffer = '\n'.join(user_input['lines'])
27 f = io.StringIO()
28 with redirect_stdout(f):
29 p = subprocess.run(cmd_buffer, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
31 channel.write_json(
32 spark,
33 CLI_RESPONSE_NAME,
34 {
35 "status": "ok",
36 "exit_code": p.returncode,
37 "output": p.stdout.decode('utf-8'),
38 }
39 )
41def handle_python(spark, user_input, console, channel):
42 source = '\n'.join(user_input['lines'])
43 stdout_f = io.StringIO()
44 stderr_f = io.StringIO()
45 with redirect_stdout(stdout_f):
46 with redirect_stderr(stderr_f):
47 console.runsource(source, symbol="exec")
49 channel.write_json(
50 spark,
51 CLI_RESPONSE_NAME,
52 {
53 "status": "ok",
54 "output": stdout_f.getvalue() + "\n" + stderr_f.getvalue() ,
55 }
56 )
58class PySparkConsole(code.InteractiveInterpreter):
59 def __init__(self, locals=None):
60 super(PySparkConsole, self).__init__(locals=locals)
63class CLIHandler:
64 def __init__(self, client_channel, is_job_active, handlers):
65 self.client_channel = client_channel
66 if is_job_active is None:
67 self.is_job_active = lambda : True
68 else:
69 self.is_job_active = is_job_active
71 self.handlers = handlers
72 self.last_job_ok_time = None
75 def loop(self):
76 # line_mode can be "bash", "python" or "OFF"
77 # When line_mode is OFF, you need send explicitly run @@bash or @@python
78 # to submit a block of code to server
79 # When line_mode is bash, each line is a bash script
80 # when line_mode is python, each line is a python script
81 line_mode = "off"
83 # if is_waiting_for_response is True, we need to pull server for cli-response.json
84 # if is_waiting_for_response is False, we are free to enter new command
85 is_waiting_for_response = True
86 # command line buffer
87 cli_lines = []
88 cli_wait_prompt = "-/|\\"
89 cli_wait_prompt_idx = 0
90 log_filename = None
91 # commands
92 # @@log -- all the output will be written to the log file as well
93 # by default there is not log
94 # @@nolog -- turn off log
95 # @@clear -- clear command line buffer
96 # @@load -- load a script from local file and append to command buffer
97 # @@bash -- submit a bash script
98 # @@python -- submit a python script
99 # @@show -- show the command buffer
100 # @@pwd -- show driver's current directory
101 # @@quit -- quit the cli console
103 command = None
104 while True:
105 handle_server_ask(self.client_channel, self.handlers)
106 if not is_waiting_for_response:
107 if line_mode == "off":
108 prompt = "> "
109 elif line_mode == "bash":
110 prompt = "bash> "
111 else:
112 prompt = "python> "
114 command = input(prompt)
116 if command == "@@quit":
117 self.client_channel.write_json(CLI_REQUEST_NAME, {"type": "@@quit"})
118 is_waiting_for_response = True
119 continue
121 if command == "@@pwd":
122 self.client_channel.write_json(CLI_REQUEST_NAME, {"type": "@@pwd"})
123 is_waiting_for_response = True
124 continue
126 if command == "@@bash":
127 self.client_channel.write_json(
128 CLI_REQUEST_NAME,
129 {
130 "type": "@@bash",
131 "lines": cli_lines
132 }
133 )
134 is_waiting_for_response = True
135 cli_lines = []
136 continue
138 if command == "@@python":
139 self.client_channel.write_json(
140 CLI_REQUEST_NAME,
141 {
142 "type": "@@python",
143 "lines": cli_lines
144 }
145 )
146 is_waiting_for_response = True
147 cli_lines = []
148 continue
150 if command.startswith("@@mode"):
151 cmds = command.split(" ")
152 if len(cmds) != 2 or cmds[1] not in ("off", "bash", "python"):
153 print("Usage:")
154 print("@@mode off")
155 print("@@mode python")
156 print("@@mode bash")
157 else:
158 line_mode = cmds[1]
159 continue
161 if command.startswith("@@log"):
162 cmds = command.split(" ")
163 if len(cmds) != 2:
164 print("Usage:")
165 print("@@log <filename>")
166 else:
167 log_filename = cmds[1]
168 continue
170 if command.startswith("@@load"):
171 cmds = command.split(" ")
172 if len(cmds) != 2:
173 print("Usage:")
174 print("@@load <filename>")
175 else:
176 try:
177 with open(cmds[1], "rt") as load_f:
178 for line in load_f:
179 cli_lines.append(line.rstrip())
180 except Exception as e:
181 print(f"Unable to read from file: {str(e)}")
182 continue
184 if command == "@@clear":
185 cli_lines = []
186 continue
188 if command == "@@show":
189 for line in cli_lines:
190 print(line)
191 print()
192 continue
194 # for any other command, we will append to the cli buffer
195 if line_mode == "off":
196 cli_lines.append(command)
197 else:
198 self.client_channel_write_json(
199 CLI_REQUEST_NAME,
200 {
201 "type": "@@" + line_mode,
202 "lines": [ command ]
203 }
204 )
205 is_waiting_for_response = True
206 else:
207 if self.client_channel.has_json(CLI_RESPONSE_NAME):
208 response = self.client_channel.read_json(CLI_RESPONSE_NAME)
209 self.client_channel.delete_json(CLI_RESPONSE_NAME)
210 # print('#################################################')
211 # print('# Response #')
212 # print(f"# status : {response['status']}")
213 # if 'exit_code' in response:
214 # print(f"# exit_code: {response['exit_code']}")
215 # print('#################################################')
216 cprint(response['output'], 'green', 'on_red')
217 if log_filename is not None:
218 try:
219 with open(log_filename, "a+t") as log_f:
220 print(response['output'], file=log_f)
221 print("", file=log_f)
222 except Exception as e:
223 print(f"Unable to write to file {log_filename}: {str(e)}")
225 if command == "@@quit":
226 break
228 if self.is_job_active():
229 self.last_job_ok_time = datetime.utcnow()
230 is_waiting_for_response = False
231 else:
232 print("Job quit unexpectedly!")
233 break
234 else:
235 time.sleep(1) # do not sleep too long since this is an interactive session
236 now = datetime.utcnow()
237 if self.last_job_ok_time is None or (now - self.last_job_ok_time).total_seconds() >= 10:
238 if self.is_job_active():
239 self.last_job_ok_time = now
240 else:
241 print("Job quit unexpectedly!")
242 break
244 print(f"\r{cli_wait_prompt[cli_wait_prompt_idx]}\r", end="")
245 cli_wait_prompt_idx = (cli_wait_prompt_idx + 1) % 4
248def cli_main(spark, args, sysops={}):
249 channel = sysops['channel']
250 console = PySparkConsole(locals={'spark': spark, 'sysops': sysops})
252 channel.write_json(
253 spark,
254 CLI_RESPONSE_NAME,
255 {
256 "status": "ok",
257 "output": "Welcome to OCI Spark-CLI Interface",
258 }
259 )
261 while True:
262 if not channel.has_json(spark, CLI_REQUEST_NAME):
263 time.sleep(1)
264 continue
266 user_input = channel.read_json(spark, CLI_REQUEST_NAME)
267 channel.delete_json(spark, CLI_REQUEST_NAME)
269 if user_input["type"] == "@@quit":
270 channel.write_json(
271 spark,
272 CLI_RESPONSE_NAME,
273 {
274 "status": "ok",
275 "output": "Server quit gracefully",
276 }
277 )
278 break
279 if user_input["type"] == "@@pwd":
280 handle_pwd(spark, user_input, channel)
281 continue
282 if user_input["type"] == "@@bash":
283 handle_bash(spark, user_input, channel)
284 continue
285 if user_input["type"] == "@@python":
286 handle_python(spark, user_input, console, channel)
287 continue
288 return {"status": "ok"}
290SERVER_TO_CLIENT_REQUEST = "stc-request.json"
291SERVER_TO_CLIENT_RESPONSE = "stc-response.json"
293class RemoteCallException(Exception):
294 def __init__(self, status, exception_msg=None, exception_name=None):
295 if status == 'nohandler':
296 msg = 'remote call failed: no handler'
297 elif status == 'exception':
298 msg = f'remote call failed: exception, name="{exception_name}", msg="{exception_msg}"'
299 else:
300 msg =f'remote call failed: something is wrong'
302 super(RemoteCallException, self).__init__(msg)
303 self.msg = msg
304 self.status = status
305 self.exception_msg = exception_msg
306 self.exception_name = exception_name
309def server_ask_client(spark, channel, content, timeout=600, check_interval=5):
310 """Called by server, ask a question and wait for answer
311 """
312 channel.write_json(spark, SERVER_TO_CLIENT_REQUEST, content)
313 start_time = datetime.utcnow()
314 while True:
315 if channel.has_json(spark, SERVER_TO_CLIENT_RESPONSE):
316 response = channel.read_json(spark, SERVER_TO_CLIENT_RESPONSE)
317 channel.delete_json(spark, SERVER_TO_CLIENT_RESPONSE)
318 if response['status'] == 'ok':
319 return response['answer']
320 elif response['status'] == 'nohandler':
321 raise RemoteCallException("nohandler")
322 elif response['status'] == 'exception':
323 exception_name = response.get('exception_name')
324 exception_msg = response.get('exception_msg')
325 raise RemoteCallException(
326 "exception",
327 exception_name = response.get('exception_name'),
328 exception_msg = response.get('exception_msg')
329 )
330 else:
331 raise RemoteCallException(None)
334 if (datetime.utcnow() - start_time).total_seconds() >= timeout:
335 raise Exception("Ask is not answered: timed out")
336 else:
337 time.sleep(check_interval)
340def handle_server_ask(channel, handlers):
341 """Called by job submitter to answer ask from server with handlers
342 """
343 if handlers is None:
344 return
346 if not channel.has_json(SERVER_TO_CLIENT_REQUEST):
347 return
349 content = channel.read_json(SERVER_TO_CLIENT_REQUEST)
350 channel.delete_json(SERVER_TO_CLIENT_REQUEST)
351 print(f"handle_server_ask: topic={content.get('topic')}")
353 for handler in handlers:
354 try:
355 handled, out = handler(content)
356 if handled:
357 channel.write_json(
358 SERVER_TO_CLIENT_RESPONSE,
359 {
360 'status': 'ok',
361 'answer': out
362 }
363 )
364 return
365 except Exception as e:
366 traceback.print_exc()
367 channel.write_json(
368 SERVER_TO_CLIENT_RESPONSE,
369 {
370 'status': 'exception',
371 "exception_name": e.__class__.__name__,
372 "exception_msg": str(e)
373 }
374 )
375 return
377 channel.write_json(
378 SERVER_TO_CLIENT_RESPONSE,
379 {
380 'status': 'nohandler'
381 }
382 )
383 return