Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import uuid
2import subprocess
3import os
4from urllib.parse import urlparse
6from .abstract_deployer import AbstractDeployer
7from spark_etl import Build
8from spark_etl.exceptions import SparkETLDeploymentFailure
10def _execute(host, cmd, error_ok=False):
11 r = subprocess.call(["ssh", "-q", "-t", host, cmd], shell=False)
12 if not error_ok and r != 0:
13 raise Exception(f"command \"{cmd}\" failed with exit code {r}")
16class HDFSDeployer(AbstractDeployer):
17 """
18 This deployer deploys application to HDFS
19 """
20 def __init__(self, config):
21 super(HDFSDeployer, self).__init__(config)
23 def deploy(self, build_dir, deployment_location):
24 o = urlparse(deployment_location)
25 if o.scheme != 'hdfs':
26 raise SparkETLDeploymentFailure("deployment_location must be in hdfs")
28 # let's copy files to the stage dir
29 bridge_dir = os.path.join(self.config['stage_dir'], str(uuid.uuid4()))
31 bridge = self.config["bridge"]
32 _execute(bridge, f"mkdir -p {bridge_dir}")
34 build = Build(build_dir)
36 for artifact in build.artifacts:
37 subprocess.call([
38 'scp', '-q', f"{build_dir}/{artifact}", f"{bridge}:{bridge_dir}/{artifact}"
39 ])
41 # copy job loader
42 job_loader_filename = os.path.join(
43 os.path.dirname(os.path.abspath(__file__)),
44 'job_loader.py'
45 )
46 subprocess.call([
47 'scp', '-q', job_loader_filename, f"{bridge}:{bridge_dir}/job_loader.py"
48 ])
51 dest_location = f"{deployment_location}/{build.version}"
52 _execute(bridge, f"hdfs dfs -rm -r {dest_location}", error_ok=True)
53 _execute(bridge, f"hdfs dfs -mkdir -p {dest_location}")
55 artifacts = []
56 artifacts.extend(build.artifacts)
57 artifacts.append("job_loader.py")
58 for artifact in artifacts:
59 _execute(bridge, f"hdfs dfs -copyFromLocal {bridge_dir}/{artifact} {dest_location}/{artifact}")
61 _execute(bridge, f"rm -rf {bridge_dir}")