Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import importlib 

2import argparse 

3import argparse 

4import uuid 

5import os 

6import subprocess 

7import sys 

8import tempfile 

9import json 

10import random 

11 

12from pyspark import SparkFiles 

13from pyspark.sql import SparkSession 

14 

15random.seed() 

16 

17def get_server_channel(run_dir): 

18 # from spark_etl.core import ServerChannelInterface 

19 class ServerChannel: 

20 def __init__(self, run_dir): 

21 self.run_dir = run_dir 

22 

23 def read_json(self, spark, name): 

24 stage_file = tempfile.NamedTemporaryFile(mode="w", delete=False) 

25 stage_file.close() 

26 try: 

27 subprocess.check_call([ 

28 "hdfs", "dfs", "-copyToLocal", "-f", 

29 os.path.join(self.run_dir, name), 

30 stage_file.name 

31 ]) 

32 with open(stage_file.name) as f: 

33 return json.load(f) 

34 finally: 

35 os.remove(stage_file.name) 

36 

37 

38 def has_json(self, spark, name): 

39 exit_code = subprocess.call([ 

40 "hdfs", "dfs", "-test", "-f", 

41 os.path.join(self.run_dir, name) 

42 ]) 

43 if exit_code == 0: 

44 return True 

45 if exit_code == 1: 

46 return False 

47 raise Exception(f"Unrecognized exit_code: {exit_code}") 

48 

49 

50 def write_json(self, spark, name, payload): 

51 stage_file = tempfile.NamedTemporaryFile(mode="w", delete=False) 

52 stage_file.close() 

53 try: 

54 with open(stage_file.name, "wt") as f: 

55 json.dump(payload, f) 

56 subprocess.check_call([ 

57 "hdfs", "dfs", "-copyFromLocal", stage_file.name, 

58 os.path.join(self.run_dir, name) 

59 ]) 

60 finally: 

61 os.remove(stage_file.name) 

62 

63 

64 def delete_json(self, spark, name): 

65 subprocess.check_call([ 

66 "hdfs", "dfs", "-rm", os.path.join(self.run_dir, name) 

67 ]) 

68 

69 return ServerChannel(run_dir) 

70 

71 

72# lib installer 

73def _install_libs(run_id): 

74 ########################################## 

75 # 

76 # | 

77 # +-- {lib_dir} 

78 # | 

79 # +-- uuid1.zip 

80 # | 

81 # +-- uuid2.zip 

82 # | 

83 # +-- uuid1 (lib extracted) 

84 # | 

85 # +-- uuid2 (lib extracted) 

86 ########################################## 

87 print("job_loader._install_libs: enter, run_id = {}".format(run_id)) 

88 

89 current_dir = os.path.dirname(os.path.abspath(__file__)) 

90 print(f"current_dir = {current_dir}") 

91 

92 base_dir = os.path.join(current_dir, run_id) 

93 lib_dir = os.path.join(base_dir, 'python_libs') 

94 lib_zip = SparkFiles.get("lib.zip") 

95 lock_name = os.path.join(base_dir, '__lock__') 

96 

97 os.makedirs(base_dir, exist_ok=True) 

98 

99 for i in range(0, 100): 

100 try: 

101 lock_fh = os.open(lock_name, os.O_CREAT | os.O_EXCL | os.O_WRONLY) 

102 os.close(lock_fh) 

103 try: 

104 if not os.path.isdir(lib_dir): 

105 print("_install_libs: install lib starts") 

106 os.makedirs(lib_dir) 

107 subprocess.check_call(['unzip', "-qq", lib_zip, "-d", lib_dir]) 

108 print("_install_libs: install lib done") 

109 if lib_dir not in sys.path: 

110 print(f"_install_libs: add {lib_dir} path") 

111 sys.path.insert(0, lib_dir) 

112 print("job_loader._install_libs: exit") 

113 return 

114 finally: 

115 os.remove(lock_name) 

116 except OSError as e: 

117 if e.errno == errno.EEXIST: 

118 time.sleep(random.randint(1, 10)) 

119 continue 

120 raise 

121 

122 raise Exception("Failed to install libraries!") 

123 

124 

125 

126print("job_loader.py: enter") 

127 

128parser = argparse.ArgumentParser(description='job') 

129parser.add_argument( 

130 "--run-id", type=str, required=True, help="Run ID", 

131) 

132parser.add_argument( 

133 "--run-dir", type=str, required=True, help="Run Directory", 

134) 

135parser.add_argument( 

136 "--lib-zip", type=str, required=True, help="Zipped library", 

137) 

138 

139args = parser.parse_args() 

140spark = SparkSession.builder.appName("RunJob").getOrCreate() 

141 

142sc = spark.sparkContext 

143sc.addFile(args.lib_zip) 

144 

145_install_libs(args.run_id) 

146 

147# get input 

148server_channel = get_server_channel(args.run_dir) 

149input_args = server_channel.read_json(spark, "input.json") 

150 

151try: 

152 entry = importlib.import_module("main") 

153 result = entry.main(spark, input_args, sysops={ 

154 "install_libs": lambda : _install_libs(run_id), 

155 "channel": get_server_channel(args.run_dir) 

156 }) 

157 

158 # save output 

159 server_channel.write_json(spark, "result.json", result) 

160 print("job_loader: exit gracefully") 

161finally: 

162 spark.stop()