Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import uuid 

2import os 

3import subprocess 

4from urllib.parse import urlparse 

5import json 

6import tempfile 

7 

8from jinja2 import Template 

9 

10import oci 

11from oci_core import get_os_client, get_df_client, os_upload, os_upload_json 

12 

13from spark_etl.deployers import AbstractDeployer 

14from spark_etl import Build, SparkETLDeploymentFailure 

15from .tools import check_response, remote_execute 

16 

17def _save_json_temp(payload): 

18 # save a dict to temporary json file, and return filename 

19 with tempfile.NamedTemporaryFile(delete=False) as f: 

20 f.write(json.dumps(payload).encode('utf8')) 

21 return f.name 

22 

23def get_job_loader(oci_config): 

24 # render job_loader.py, rendered result in a temporary file 

25 # return the rendered filename 

26 job_loader_filename = os.path.join( 

27 os.path.dirname(os.path.abspath(__file__)), 

28 'job_loader.py' 

29 ) 

30 

31 if oci_config is not None: 

32 oci_cfg = dict(oci_config) 

33 key_file = oci_cfg.pop("key_file") 

34 with open(key_file, "rt") as key_f: 

35 oci_key = key_f.read() 

36 

37 with open(job_loader_filename, "rt") as f: 

38 load_content = f.read() 

39 template = Template(load_content) 

40 if oci_config is None: 

41 c = template.render( 

42 use_instance_principle = True, 

43 ) 

44 else: 

45 c = template.render( 

46 use_instance_principle = False, 

47 oci_config_str = json.dumps(oci_cfg, indent=4), 

48 oci_key = oci_key 

49 ) 

50 with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f: 

51 f.write(c) 

52 return f.name 

53 

54 

55class DataflowDeployer(AbstractDeployer): 

56 def __init__(self, config): 

57 super(DataflowDeployer, self).__init__(config) 

58 # config must have 

59 # region, e.g., value is IAD 

60 # compartment_id: the compartment we want to have the dataflow app 

61 # driver_shape 

62 # executor_shape 

63 # num_executors 

64 

65 @property 

66 def region(self): 

67 return self.config["region"] 

68 

69 def create_application(self, manifest, destination_location): 

70 df_client = get_df_client(self.region, config=self.config.get("oci_config")) 

71 dataflow = self.config['dataflow'] 

72 

73 display_name = f"{manifest['display_name']}-{manifest['version']}" 

74 

75 # if the application already exist, we will fail the deployment 

76 # since user should bump the version for new deployment 

77 r = df_client.list_applications( 

78 dataflow['compartment_id'], 

79 limit=1, 

80 display_name=display_name, 

81 ) 

82 check_response(r, lambda : SparkETLDeploymentFailure("Unable to list application")) 

83 if len(r.data) > 0: 

84 raise SparkETLDeploymentFailure(f"Application {display_name} already created") 

85 

86 create_application_details = oci.data_flow.models.CreateApplicationDetails( 

87 compartment_id=dataflow['compartment_id'], 

88 display_name=display_name, 

89 driver_shape=dataflow['driver_shape'], 

90 executor_shape=dataflow['executor_shape'], 

91 num_executors=dataflow['num_executors'], 

92 spark_version="2.4.4", 

93 file_uri=f"{destination_location}/{manifest['version']}/job_loader.py", 

94 language="PYTHON", 

95 archive_uri=dataflow.get("archive_uri") 

96 ) 

97 

98 r = df_client.create_application( 

99 create_application_details=create_application_details 

100 ) 

101 check_response(r, lambda : SparkETLDeploymentFailure("Unable to create dataflow application")) 

102 return r.data 

103 

104 def deploy(self, build_dir, destination_location): 

105 o = urlparse(destination_location) 

106 if o.scheme != 'oci': 

107 raise SparkETLDeploymentFailure("destination_location must be in OCI") 

108 

109 namespace = o.netloc.split('@')[1] 

110 bucket = o.netloc.split('@')[0] 

111 root_path = o.path[1:] # remove the leading "/" 

112 

113 build = Build(build_dir) 

114 

115 print("Uploading files:") 

116 # Data flow want to call python lib python.zip 

117 os_client = get_os_client(self.region, config=self.config.get("oci_config")) 

118 for artifact in build.artifacts: 

119 os_upload( 

120 os_client, 

121 f"{build_dir}/{artifact}", 

122 namespace, 

123 bucket, 

124 f"{root_path}/{build.version}/{artifact}" 

125 ) 

126 

127 # let's upload the job loader 

128 job_loader_filename = get_job_loader(self.config.get("oci_config")) 

129 

130 os_upload( 

131 os_client, 

132 job_loader_filename, 

133 namespace, 

134 bucket, 

135 f"{root_path}/{build.version}/job_loader.py" 

136 ) 

137 

138 application = self.create_application(build.manifest, destination_location) 

139 app_info = { 

140 "application_id": application.id, 

141 "compartment_id": application.compartment_id 

142 } 

143 

144 os_upload_json( 

145 os_client, app_info, 

146 namespace, bucket, f"{root_path}/{build.version}/deployment.json" 

147 ) 

148 

149 oci_config = self.config.get("oci_config") 

150 if oci_config is not None: 

151 os_upload( 

152 os_client, 

153 _save_json_temp(oci_config), 

154 namespace, 

155 bucket, 

156 "oci_config.json" 

157 ) 

158 os_upload( 

159 os_client, 

160 oci_config['key_file'], 

161 namespace, 

162 bucket, 

163 "oci_api_key.pem", 

164 ) 

165 

166