Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import json 

2import os 

3import uuid 

4import tempfile 

5import subprocess 

6import json 

7import time 

8 

9from spark_etl.job_submitters import AbstractJobSubmitter 

10from spark_etl.core import ClientChannelInterface 

11from spark_etl.utils import CLIHandler 

12 

13class ClientChannel(ClientChannelInterface): 

14 def __init__(self, run_dir, run_id): 

15 self.run_dir = run_dir 

16 self.run_id = run_id 

17 

18 def _get_json_path(self, name): 

19 return os.path.join(self.run_dir, self.run_id, name) 

20 

21 def read_json(self, name): 

22 with open(self._get_json_path(name), "r") as f: 

23 return json.load(f) 

24 

25 def has_json(self, name): 

26 return os.path.isfile(self._get_json_path(name)) 

27 

28 def write_json(self, name, payload): 

29 with open(self._get_json_path(name), "w") as f: 

30 json.dump(payload, f) 

31 

32 def delete_json(self, name): 

33 os.remove(self._get_json_path(name)) 

34 

35 

36class PySparkJobSubmitter(AbstractJobSubmitter): 

37 def __init__(self, config): 

38 super(PySparkJobSubmitter, self).__init__(config) 

39 

40 

41 def run(self, deployment_location, options={}, args={}, handlers=[], on_job_submitted=None, cli_mode=False): 

42 # version is already baked into deployment_location 

43 # local submitter ignores handlers 

44 run_id = str(uuid.uuid4()) 

45 run_dir = self.config['run_dir'] 

46 app_dir = deployment_location 

47 

48 os.makedirs(os.path.join(run_dir, run_id)) 

49 

50 # generate input.json 

51 with open(os.path.join(run_dir, run_id, 'input.json'), 'wt') as f: 

52 json.dump(args, f) 

53 

54 client_channel = ClientChannel(run_dir, run_id) 

55 p = subprocess.Popen([ 

56 "spark-submit", 

57 os.path.join(deployment_location, "job_loader.py"), 

58 "--run-id", run_id, 

59 "--run-dir", run_dir, 

60 "--app-dir", app_dir 

61 ]) 

62 exit_code = None 

63 cli_entered = False 

64 

65 if on_job_submitted is not None: 

66 on_job_submitted(run_id, vendor_info={}) 

67 

68 while True: 

69 time.sleep(1) 

70 exit_code = p.poll() 

71 if exit_code is not None: 

72 break 

73 

74 if cli_mode and not cli_entered: 

75 cli_entered = True 

76 cli_handler = CLIHandler( 

77 client_channel, 

78 lambda : p.poll() is None, 

79 handlers 

80 ) 

81 cli_handler.loop() 

82 

83 if exit_code != 0: 

84 Exception(f"Job failed, exit_code = {exit_code}") 

85 

86 print("Job completed successfully") 

87 with open(os.path.join(run_dir, run_id, "result.json"), "r") as f: 

88 return json.load(f)