Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import uuid
2import os
3import subprocess
4from urllib.parse import urlparse
5import json
6import tempfile
8from jinja2 import Template
10import oci
11from oci_core import get_os_client, get_df_client, os_upload, os_upload_json
13from spark_etl.deployers import AbstractDeployer
14from spark_etl import Build, SparkETLDeploymentFailure
15from .tools import check_response, remote_execute
17def _save_json_temp(payload):
18 # save a dict to temporary json file, and return filename
19 with tempfile.NamedTemporaryFile(delete=False) as f:
20 f.write(json.dumps(payload).encode('utf8'))
21 return f.name
23def get_job_loader(oci_config):
24 # render job_loader.py, rendered result in a temporary file
25 # return the rendered filename
26 job_loader_filename = os.path.join(
27 os.path.dirname(os.path.abspath(__file__)),
28 'job_loader.py'
29 )
31 if oci_config is not None:
32 oci_cfg = dict(oci_config)
33 key_file = oci_cfg.pop("key_file")
34 with open(key_file, "rt") as key_f:
35 oci_key = key_f.read()
37 with open(job_loader_filename, "rt") as f:
38 load_content = f.read()
39 template = Template(load_content)
40 if oci_config is None:
41 c = template.render(
42 use_instance_principle = True,
43 )
44 else:
45 c = template.render(
46 use_instance_principle = False,
47 oci_config_str = json.dumps(oci_cfg, indent=4),
48 oci_key = oci_key
49 )
50 with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f:
51 f.write(c)
52 return f.name
55class DataflowDeployer(AbstractDeployer):
56 def __init__(self, config):
57 super(DataflowDeployer, self).__init__(config)
58 # config must have
59 # region, e.g., value is IAD
60 # compartment_id: the compartment we want to have the dataflow app
61 # driver_shape
62 # executor_shape
63 # num_executors
65 @property
66 def region(self):
67 return self.config["region"]
69 def create_application(self, manifest, destination_location):
70 df_client = get_df_client(self.region, config=self.config.get("oci_config"))
71 dataflow = self.config['dataflow']
73 display_name = f"{manifest['display_name']}-{manifest['version']}"
75 # if the application already exist, we will fail the deployment
76 # since user should bump the version for new deployment
77 r = df_client.list_applications(
78 dataflow['compartment_id'],
79 limit=1,
80 display_name=display_name,
81 )
82 check_response(r, lambda : SparkETLDeploymentFailure("Unable to list application"))
83 if len(r.data) > 0:
84 raise SparkETLDeploymentFailure(f"Application {display_name} already created")
86 create_application_details = oci.data_flow.models.CreateApplicationDetails(
87 compartment_id=dataflow['compartment_id'],
88 display_name=display_name,
89 driver_shape=dataflow['driver_shape'],
90 executor_shape=dataflow['executor_shape'],
91 num_executors=dataflow['num_executors'],
92 spark_version="2.4.4",
93 file_uri=f"{destination_location}/{manifest['version']}/job_loader.py",
94 language="PYTHON",
95 archive_uri=dataflow.get("archive_uri")
96 )
98 r = df_client.create_application(
99 create_application_details=create_application_details
100 )
101 check_response(r, lambda : SparkETLDeploymentFailure("Unable to create dataflow application"))
102 return r.data
104 def deploy(self, build_dir, destination_location):
105 o = urlparse(destination_location)
106 if o.scheme != 'oci':
107 raise SparkETLDeploymentFailure("destination_location must be in OCI")
109 namespace = o.netloc.split('@')[1]
110 bucket = o.netloc.split('@')[0]
111 root_path = o.path[1:] # remove the leading "/"
113 build = Build(build_dir)
115 print("Uploading files:")
116 # Data flow want to call python lib python.zip
117 os_client = get_os_client(self.region, config=self.config.get("oci_config"))
118 for artifact in build.artifacts:
119 os_upload(
120 os_client,
121 f"{build_dir}/{artifact}",
122 namespace,
123 bucket,
124 f"{root_path}/{build.version}/{artifact}"
125 )
127 # let's upload the job loader
128 job_loader_filename = get_job_loader(self.config.get("oci_config"))
130 os_upload(
131 os_client,
132 job_loader_filename,
133 namespace,
134 bucket,
135 f"{root_path}/{build.version}/job_loader.py"
136 )
138 application = self.create_application(build.manifest, destination_location)
139 app_info = {
140 "application_id": application.id,
141 "compartment_id": application.compartment_id
142 }
144 os_upload_json(
145 os_client, app_info,
146 namespace, bucket, f"{root_path}/{build.version}/deployment.json"
147 )
149 oci_config = self.config.get("oci_config")
150 if oci_config is not None:
151 os_upload(
152 os_client,
153 _save_json_temp(oci_config),
154 namespace,
155 bucket,
156 "oci_config.json"
157 )
158 os_upload(
159 os_client,
160 oci_config['key_file'],
161 namespace,
162 bucket,
163 "oci_api_key.pem",
164 )