Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import importlib 

2import argparse 

3import json 

4import os 

5import subprocess 

6import sys 

7import random 

8 

9from pyspark.sql import SparkSession 

10from pyspark import SparkFiles 

11 

12random.seed() 

13 

14class ServerChannel: 

15 def __init__(self, run_dir): 

16 self.run_dir = run_dir 

17 

18 def read_json(self, spark, name): 

19 with open(os.path.join(self.run_dir, name), "r") as f: 

20 return json.load(f) 

21 

22 def has_json(self, spark, name): 

23 return os.path.isfile(os.path.join(self.run_dir, name)) 

24 

25 def write_json(self, spark, name, payload): 

26 with open(os.path.join(self.run_dir, name), "w") as f: 

27 json.dump(payload, f) 

28 

29 def delete_json(self, spark, name): 

30 os.remove(os.path.join(self.run_dir, name)) 

31 

32# lib installer 

33def _install_libs(run_home): 

34 print(f"job_loader._install_libs: enter, run_home = {run_home}") 

35 

36 lib_zip = SparkFiles.get("lib.zip") 

37 lib_dir = os.path.join(run_home, "python_libs") 

38 lock_name = os.path.join(run_home, '__lock__') 

39 

40 for i in range(0, 100): 

41 try: 

42 lock_fh = os.open(lock_name, os.O_CREAT | os.O_EXCL | os.O_WRONLY) 

43 os.close(lock_fh) 

44 try: 

45 if not os.path.isdir(lib_dir): 

46 print("_install_libs: install lib starts") 

47 os.makedirs(lib_dir) 

48 subprocess.check_call(['unzip', "-qq", lib_zip, "-d", lib_dir]) 

49 print("_install_libs: install lib done") 

50 if lib_dir not in sys.path: 

51 print(f"_install_libs: add {lib_dir} path") 

52 sys.path.insert(0, lib_dir) 

53 print("job_loader._install_libs: exit") 

54 return 

55 finally: 

56 os.remove(lock_name) 

57 except OSError as e: 

58 if e.errno == errno.EEXIST: 

59 time.sleep(random.randint(1, 10)) 

60 continue 

61 raise 

62 

63 raise Exception("Failed to install libraries!") 

64 

65 

66def _bootstrap(): 

67 parser = argparse.ArgumentParser(description='job') 

68 parser.add_argument( 

69 "--run-id", type=str, required=True, help="Run ID", 

70 ) 

71 parser.add_argument( 

72 "--run-dir", type=str, required=True, help="Run Directory", 

73 ) 

74 parser.add_argument( 

75 "--app-dir", type=str, required=True, help="Application Directory", 

76 ) 

77 args = parser.parse_args() 

78 spark = SparkSession.builder.appName(f"RunJob-{args.run_id}").getOrCreate() 

79 

80 sc = spark.sparkContext 

81 sc.addPyFile(os.path.join(args.app_dir, "app.zip")) 

82 sc.addFile(os.path.join(args.app_dir, 'lib.zip')) 

83 

84 print(f"run-id: {args.run_id}") 

85 print(f"run-dir: {args.run_dir}") 

86 print(f"app-dir: {args.app_dir}") 

87 

88 run_home = os.path.join(args.run_dir, args.run_id) 

89 print(f"run-home: {run_home}") 

90 os.chdir(run_home) 

91 

92 # setup lib path 

93 _install_libs(run_home) 

94 

95 # load input args 

96 with open(os.path.join(run_home, "input.json"), "r") as f: 

97 input_args = json.load(f) 

98 

99 

100 try: 

101 entry = importlib.import_module("main") 

102 result = entry.main(spark, input_args, sysops={ 

103 "install_libs": lambda : _install_libs(run_home), 

104 "channel": ServerChannel(os.path.join(args.run_dir, args.run_id)) 

105 }) 

106 

107 # save output 

108 with open(os.path.join(run_home, "result.json"), "w") as out_f: 

109 json.dump(result, out_f) 

110 

111 finally: 

112 spark.stop() 

113 

114_bootstrap() 

115