两条Linux命令的事,却要写这么冗长的脚本,就当测试HTTP RESTful API好了
#!/usr/bin/env python import requests import json import sys #source information os_username = '' os_password = '' os_tenant_name = 'Project_%s' %os_username os_api_host = 'http://xx.xx.xx.xx' #payload payload = { "auth": { "passwordCredentials": { "username": os_username, "password": os_password, }, "tenantName": os_tenant_name, } } #headers headers = { "Content-Type": "application/json", "Accept": "application/json" } class SecurityGroup(object): def __init__(self): self.token = self.token_get() self.security_group_name = 'group' self.payload = {} self.payload["security_group_rule"] = {} if len(sys.argv) == 1: print('Please run this script like this:') print('python security-group-create.py direction=[ingress|egress] ethertype=IPv4 protocol=[tcp|udp|icmp] port_range_min=xx port_range_max=xx remote_ip_prefix=[xx.xx.xx.xx|xx.xx.xx.xx/xx]') sys.exit(0) for rule in sys.argv[1:]: rule_key, rule_value = rule.split('=', 1) if rule_key.find('port') != -1: rule_value = int(rule_value) self.payload["security_group_rule"][rule_key] = rule_value self.security_group_name += '-%s' %rule_value print('security_group_name: %s' %self.security_group_name) def token_get(self): os_auth_url = os_api_host + ':5000/v2.0/tokens' try: r = requests.post(os_auth_url, data = json.dumps(payload), headers = headers) except Exception as e: print('ERROR: keystone token-get failed, %s' %e) if r.status_code == 200: return r.json()["access"]["token"]["id"] else: print('ERROR, keystone token-get status code return %s' %r.status_code) sys.exit(0) def security_group_create(self): headers["X-Auth-Token"] = self.token security_group_create_url = os_api_host + ':9696/v2.0/security-groups' payload = { "security_group": { "name": self.security_group_name } } try: r = requests.post(security_group_create_url, data = json.dumps(payload), headers = headers) except Exception as e: print('ERROR: neutron security-group-create %s failed, %s' %(self.security_group_name, e)) if r.status_code != 201: print('ERROR, neutron security-group-create %s return %s' %(self.security_group_name, r.status_code)) sys.exit(0) return r.json()['security_group']['id'] def security_group_rule_create(self, security_group_id): print('security_group_id: %s' %security_group_id) headers["X-Auth-Token"] = self.token self.payload["security_group_rule"]["security_group_id"] = security_group_id self.payload["security_group_rule"]["direction"] = "ingress" self.payload["security_group_rule"]["ethertype"] = "IPv4" security_group_rule_create_url = os_api_host + ':9696/v2.0/security-group-rules' try: r = requests.post(security_group_rule_create_url, data = json.dumps(self.payload), headers = headers) except Exception as e: print('ERROR: neutron security-group-rule-create %s failed, %s' %(security_group_id, e)) if r.status_code != 201: print('ERROR: neutron security-group-rule-create %s return %s' %(security_group_id, r.status_code)) print("payload: %s" %self.payload) sys.exit(0) print('security_group_rule: %s' %self.payload["security_group_rule"]) if __name__ == '__main__': sg = SecurityGroup() sg.security_group_rule_create(sg.security_group_create())