Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import uuid 

2import subprocess 

3import os 

4from urllib.parse import urlparse 

5 

6from .abstract_deployer import AbstractDeployer 

7from spark_etl import Build 

8from spark_etl.exceptions import SparkETLDeploymentFailure 

9 

10def _execute(host, cmd, error_ok=False): 

11 r = subprocess.call(["ssh", "-q", "-t", host, cmd], shell=False) 

12 if not error_ok and r != 0: 

13 raise Exception(f"command \"{cmd}\" failed with exit code {r}") 

14 

15 

16class HDFSDeployer(AbstractDeployer): 

17 """ 

18 This deployer deploys application to HDFS 

19 """ 

20 def __init__(self, config): 

21 super(HDFSDeployer, self).__init__(config) 

22 

23 def deploy(self, build_dir, deployment_location): 

24 o = urlparse(deployment_location) 

25 if o.scheme != 'hdfs': 

26 raise SparkETLDeploymentFailure("deployment_location must be in hdfs") 

27 

28 # let's copy files to the stage dir 

29 bridge_dir = os.path.join(self.config['stage_dir'], str(uuid.uuid4())) 

30 

31 bridge = self.config["bridge"] 

32 _execute(bridge, f"mkdir -p {bridge_dir}") 

33 

34 build = Build(build_dir) 

35 

36 for artifact in build.artifacts: 

37 subprocess.call([ 

38 'scp', '-q', f"{build_dir}/{artifact}", f"{bridge}:{bridge_dir}/{artifact}" 

39 ]) 

40 

41 # copy job loader 

42 job_loader_filename = os.path.join( 

43 os.path.dirname(os.path.abspath(__file__)), 

44 'job_loader.py' 

45 ) 

46 subprocess.call([ 

47 'scp', '-q', job_loader_filename, f"{bridge}:{bridge_dir}/job_loader.py" 

48 ]) 

49 

50 

51 dest_location = f"{deployment_location}/{build.version}" 

52 _execute(bridge, f"hdfs dfs -rm -r {dest_location}", error_ok=True) 

53 _execute(bridge, f"hdfs dfs -mkdir -p {dest_location}") 

54 

55 artifacts = [] 

56 artifacts.extend(build.artifacts) 

57 artifacts.append("job_loader.py") 

58 for artifact in artifacts: 

59 _execute(bridge, f"hdfs dfs -copyFromLocal {bridge_dir}/{artifact} {dest_location}/{artifact}") 

60 

61 _execute(bridge, f"rm -rf {bridge_dir}")