Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import importlib
2import argparse
3import argparse
4import uuid
5import os
6import subprocess
7import sys
8import tempfile
9import json
10import random
12from pyspark import SparkFiles
13from pyspark.sql import SparkSession
15random.seed()
17def get_server_channel(run_dir):
18 # from spark_etl.core import ServerChannelInterface
19 class ServerChannel:
20 def __init__(self, run_dir):
21 self.run_dir = run_dir
23 def read_json(self, spark, name):
24 stage_file = tempfile.NamedTemporaryFile(mode="w", delete=False)
25 stage_file.close()
26 try:
27 subprocess.check_call([
28 "hdfs", "dfs", "-copyToLocal", "-f",
29 os.path.join(self.run_dir, name),
30 stage_file.name
31 ])
32 with open(stage_file.name) as f:
33 return json.load(f)
34 finally:
35 os.remove(stage_file.name)
38 def has_json(self, spark, name):
39 exit_code = subprocess.call([
40 "hdfs", "dfs", "-test", "-f",
41 os.path.join(self.run_dir, name)
42 ])
43 if exit_code == 0:
44 return True
45 if exit_code == 1:
46 return False
47 raise Exception(f"Unrecognized exit_code: {exit_code}")
50 def write_json(self, spark, name, payload):
51 stage_file = tempfile.NamedTemporaryFile(mode="w", delete=False)
52 stage_file.close()
53 try:
54 with open(stage_file.name, "wt") as f:
55 json.dump(payload, f)
56 subprocess.check_call([
57 "hdfs", "dfs", "-copyFromLocal", stage_file.name,
58 os.path.join(self.run_dir, name)
59 ])
60 finally:
61 os.remove(stage_file.name)
64 def delete_json(self, spark, name):
65 subprocess.check_call([
66 "hdfs", "dfs", "-rm", os.path.join(self.run_dir, name)
67 ])
69 return ServerChannel(run_dir)
72# lib installer
73def _install_libs(run_id):
74 ##########################################
75 #
76 # |
77 # +-- {lib_dir}
78 # |
79 # +-- uuid1.zip
80 # |
81 # +-- uuid2.zip
82 # |
83 # +-- uuid1 (lib extracted)
84 # |
85 # +-- uuid2 (lib extracted)
86 ##########################################
87 print("job_loader._install_libs: enter, run_id = {}".format(run_id))
89 current_dir = os.path.dirname(os.path.abspath(__file__))
90 print(f"current_dir = {current_dir}")
92 base_dir = os.path.join(current_dir, run_id)
93 lib_dir = os.path.join(base_dir, 'python_libs')
94 lib_zip = SparkFiles.get("lib.zip")
95 lock_name = os.path.join(base_dir, '__lock__')
97 os.makedirs(base_dir, exist_ok=True)
99 for i in range(0, 100):
100 try:
101 lock_fh = os.open(lock_name, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
102 os.close(lock_fh)
103 try:
104 if not os.path.isdir(lib_dir):
105 print("_install_libs: install lib starts")
106 os.makedirs(lib_dir)
107 subprocess.check_call(['unzip', "-qq", lib_zip, "-d", lib_dir])
108 print("_install_libs: install lib done")
109 if lib_dir not in sys.path:
110 print(f"_install_libs: add {lib_dir} path")
111 sys.path.insert(0, lib_dir)
112 print("job_loader._install_libs: exit")
113 return
114 finally:
115 os.remove(lock_name)
116 except OSError as e:
117 if e.errno == errno.EEXIST:
118 time.sleep(random.randint(1, 10))
119 continue
120 raise
122 raise Exception("Failed to install libraries!")
126print("job_loader.py: enter")
128parser = argparse.ArgumentParser(description='job')
129parser.add_argument(
130 "--run-id", type=str, required=True, help="Run ID",
131)
132parser.add_argument(
133 "--run-dir", type=str, required=True, help="Run Directory",
134)
135parser.add_argument(
136 "--lib-zip", type=str, required=True, help="Zipped library",
137)
139args = parser.parse_args()
140spark = SparkSession.builder.appName("RunJob").getOrCreate()
142sc = spark.sparkContext
143sc.addFile(args.lib_zip)
145_install_libs(args.run_id)
147# get input
148server_channel = get_server_channel(args.run_dir)
149input_args = server_channel.read_json(spark, "input.json")
151try:
152 entry = importlib.import_module("main")
153 result = entry.main(spark, input_args, sysops={
154 "install_libs": lambda : _install_libs(run_id),
155 "channel": get_server_channel(args.run_dir)
156 })
158 # save output
159 server_channel.write_json(spark, "result.json", result)
160 print("job_loader: exit gracefully")
161finally:
162 spark.stop()