两条Linux命令的事,却要写这么冗长的脚本,就当测试HTTP RESTful API好了
#!/usr/bin/env python
import requests
import json
import sys
#source information
os_username = ''
os_password = ''
os_tenant_name = 'Project_%s' %os_username
os_api_host = 'http://xx.xx.xx.xx'
#payload
payload = {
"auth": {
"passwordCredentials": {
"username": os_username,
"password": os_password,
},
"tenantName": os_tenant_name,
}
}
#headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
class SecurityGroup(object):
def __init__(self):
self.token = self.token_get()
self.security_group_name = 'group'
self.payload = {}
self.payload["security_group_rule"] = {}
if len(sys.argv) == 1:
print('Please run this script like this:')
print('python security-group-create.py direction=[ingress|egress] ethertype=IPv4 protocol=[tcp|udp|icmp] port_range_min=xx port_range_max=xx remote_ip_prefix=[xx.xx.xx.xx|xx.xx.xx.xx/xx]')
sys.exit(0)
for rule in sys.argv[1:]:
rule_key, rule_value = rule.split('=', 1)
if rule_key.find('port') != -1:
rule_value = int(rule_value)
self.payload["security_group_rule"][rule_key] = rule_value
self.security_group_name += '-%s' %rule_value
print('security_group_name: %s' %self.security_group_name)
def token_get(self):
os_auth_url = os_api_host + ':5000/v2.0/tokens'
try:
r = requests.post(os_auth_url, data = json.dumps(payload), headers = headers)
except Exception as e:
print('ERROR: keystone token-get failed, %s' %e)
if r.status_code == 200:
return r.json()["access"]["token"]["id"]
else:
print('ERROR, keystone token-get status code return %s' %r.status_code)
sys.exit(0)
def security_group_create(self):
headers["X-Auth-Token"] = self.token
security_group_create_url = os_api_host + ':9696/v2.0/security-groups'
payload = {
"security_group": {
"name": self.security_group_name
}
}
try:
r = requests.post(security_group_create_url, data = json.dumps(payload), headers = headers)
except Exception as e:
print('ERROR: neutron security-group-create %s failed, %s' %(self.security_group_name, e))
if r.status_code != 201:
print('ERROR, neutron security-group-create %s return %s' %(self.security_group_name, r.status_code))
sys.exit(0)
return r.json()['security_group']['id']
def security_group_rule_create(self, security_group_id):
print('security_group_id: %s' %security_group_id)
headers["X-Auth-Token"] = self.token
self.payload["security_group_rule"]["security_group_id"] = security_group_id
self.payload["security_group_rule"]["direction"] = "ingress"
self.payload["security_group_rule"]["ethertype"] = "IPv4"
security_group_rule_create_url = os_api_host + ':9696/v2.0/security-group-rules'
try:
r = requests.post(security_group_rule_create_url, data = json.dumps(self.payload), headers = headers)
except Exception as e:
print('ERROR: neutron security-group-rule-create %s failed, %s' %(security_group_id, e))
if r.status_code != 201:
print('ERROR: neutron security-group-rule-create %s return %s' %(security_group_id, r.status_code))
print("payload: %s" %self.payload)
sys.exit(0)
print('security_group_rule: %s' %self.payload["security_group_rule"])
if __name__ == '__main__':
sg = SecurityGroup()
sg.security_group_rule_create(sg.security_group_create())
