Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import importlib
2import argparse
3import json
4import os
5import subprocess
6import sys
7import random
9from pyspark.sql import SparkSession
10from pyspark import SparkFiles
12random.seed()
14class ServerChannel:
15 def __init__(self, run_dir):
16 self.run_dir = run_dir
18 def read_json(self, spark, name):
19 with open(os.path.join(self.run_dir, name), "r") as f:
20 return json.load(f)
22 def has_json(self, spark, name):
23 return os.path.isfile(os.path.join(self.run_dir, name))
25 def write_json(self, spark, name, payload):
26 with open(os.path.join(self.run_dir, name), "w") as f:
27 json.dump(payload, f)
29 def delete_json(self, spark, name):
30 os.remove(os.path.join(self.run_dir, name))
32# lib installer
33def _install_libs(run_home):
34 print(f"job_loader._install_libs: enter, run_home = {run_home}")
36 lib_zip = SparkFiles.get("lib.zip")
37 lib_dir = os.path.join(run_home, "python_libs")
38 lock_name = os.path.join(run_home, '__lock__')
40 for i in range(0, 100):
41 try:
42 lock_fh = os.open(lock_name, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
43 os.close(lock_fh)
44 try:
45 if not os.path.isdir(lib_dir):
46 print("_install_libs: install lib starts")
47 os.makedirs(lib_dir)
48 subprocess.check_call(['unzip', "-qq", lib_zip, "-d", lib_dir])
49 print("_install_libs: install lib done")
50 if lib_dir not in sys.path:
51 print(f"_install_libs: add {lib_dir} path")
52 sys.path.insert(0, lib_dir)
53 print("job_loader._install_libs: exit")
54 return
55 finally:
56 os.remove(lock_name)
57 except OSError as e:
58 if e.errno == errno.EEXIST:
59 time.sleep(random.randint(1, 10))
60 continue
61 raise
63 raise Exception("Failed to install libraries!")
66def _bootstrap():
67 parser = argparse.ArgumentParser(description='job')
68 parser.add_argument(
69 "--run-id", type=str, required=True, help="Run ID",
70 )
71 parser.add_argument(
72 "--run-dir", type=str, required=True, help="Run Directory",
73 )
74 parser.add_argument(
75 "--app-dir", type=str, required=True, help="Application Directory",
76 )
77 args = parser.parse_args()
78 spark = SparkSession.builder.appName(f"RunJob-{args.run_id}").getOrCreate()
80 sc = spark.sparkContext
81 sc.addPyFile(os.path.join(args.app_dir, "app.zip"))
82 sc.addFile(os.path.join(args.app_dir, 'lib.zip'))
84 print(f"run-id: {args.run_id}")
85 print(f"run-dir: {args.run_dir}")
86 print(f"app-dir: {args.app_dir}")
88 run_home = os.path.join(args.run_dir, args.run_id)
89 print(f"run-home: {run_home}")
90 os.chdir(run_home)
92 # setup lib path
93 _install_libs(run_home)
95 # load input args
96 with open(os.path.join(run_home, "input.json"), "r") as f:
97 input_args = json.load(f)
100 try:
101 entry = importlib.import_module("main")
102 result = entry.main(spark, input_args, sysops={
103 "install_libs": lambda : _install_libs(run_home),
104 "channel": ServerChannel(os.path.join(args.run_dir, args.run_id))
105 })
107 # save output
108 with open(os.path.join(run_home, "result.json"), "w") as out_f:
109 json.dump(result, out_f)
111 finally:
112 spark.stop()
114_bootstrap()