Projects
Eulaceura:Factory
python-boto
_service:obs_scm:boto-2.40.0-nat-gateway.patch
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:boto-2.40.0-nat-gateway.patch of Package python-boto
Index: boto-2.40.0/boto/vpc/__init__.py =================================================================== --- boto-2.40.0.orig/boto/vpc/__init__.py +++ boto-2.40.0/boto/vpc/__init__.py @@ -29,6 +29,7 @@ from boto.vpc.vpc import VPC from boto.vpc.customergateway import CustomerGateway from boto.vpc.networkacl import NetworkAcl from boto.vpc.routetable import RouteTable +from boto.vpc.natgateway import NatGateway from boto.vpc.internetgateway import InternetGateway from boto.vpc.vpngateway import VpnGateway, Attachment from boto.vpc.dhcpoptions import DhcpOptions @@ -783,6 +784,76 @@ class VPCConnection(EC2Connection): return self.get_status('DeleteNetworkAclEntry', params) + # NAT Gateways + + def get_all_nat_gateways(self, nat_gateway_ids=None, filters=None, dry_run=False): + """ + Get a list of NAT gateways. You can filter results to return information + about only those gateways that you're interested in. + + :type nat_gateway_ids: list + :param nat_gateway_ids: A list of strings with the desired gateway IDs. + + :type filters: list of tuples or dict + :param filters: A list of tuples or dict containing filters. Each tuple + or dict item consists of a filter key and a filter value. + + :type dry_run: bool + :param dry_run: Set to True if the operation should not actually run. + + """ + params = {} + + if nat_gateway_ids: + self.build_list_params(params, nat_gateway_ids, + 'NatGatewayId') + if filters: + self.build_filter_params(params, filters) + if dry_run: + params['DryRun'] = 'true' + return self.get_list('DescribeNatGateways', params, + [('item', NatGateway)]) + + def create_nat_gateway(self, subnet_id, allocation_id, dry_run=False): + """ + Creates a NAT gateway for VPC. + + :type subnet_id: str + :param subnet_id: The subnet in which the NAT gateway should be launched. + + :type allocation_id: str + :param allocation_id: The allocation ID of an elastic IP address for the public side of the gateway. + + :type dry_run: bool + :param dry_run: Set to True if the operation should not actually run. + + :rtype: Newly created nat gateway. + :return: `boto.vpc.natgateway.NATGateway` + """ + params = {'SubnetId': subnet_id, + 'AllocationId': allocation_id} + if dry_run: + params['DryRun'] = 'true' + return self.get_object('CreateNatGateway', params, NatGateway) + + def delete_nat_gateway(self, nat_gateway_id, dry_run=False): + """ + Deletes a NAT gateway from the VPC. + + :type nat_gateway_id: str + :param nat_gateway_id: The ID of the NAT gateway to delete. + + :type dry_run: bool + :param dry_run: Set to True if the operation should not actually run. + + :rtype: Bool + :return: True if successful + """ + params = {'NatGatewayId': nat_gateway_id} + if dry_run: + params['DryRun'] = 'true' + return self.get_status('DeleteNatGateway', params) + # Internet Gateways def get_all_internet_gateways(self, internet_gateway_ids=None, Index: boto-2.40.0/boto/vpc/natgateway.py =================================================================== --- /dev/null +++ boto-2.40.0/boto/vpc/natgateway.py @@ -0,0 +1,89 @@ +# Copyright (c) 2009-2010 Mitch Garnaat http://garnaat.org/ +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +""" +Represents a NAT Gateway +""" + +from boto.ec2.ec2object import TaggedEC2Object +from boto.resultset import ResultSet + + +class NatGateway(TaggedEC2Object): + def __init__(self, connection=None): + super(NatGateway, self).__init__(connection) + self.id = None + self.vpc_id = None + self.subnet_id = None + self.state = None + self.addresses = [] + + def __repr__(self): + return 'NatGateway:%s' % self.id + + def startElement(self, name, attrs, connection): + result = super(NatGateway, self).startElement(name, attrs, connection) + + if result is not None: + # Parent found an interested element, just return it + return result + + if name == 'natGatewayAddressSet': + self.addresses = ResultSet([('item', NatGatewayAddress)]) + return self.addresses + else: + return None + + def endElement(self, name, value, connection): + if name == 'natGatewayId': + self.id = value + elif name == 'vpcId': + self.vpc_id = value + elif name == 'subnetId': + self.subnet_id = value + elif name == 'state': + self.state = value + else: + setattr(self, name, value) + + +class NatGatewayAddress(object): + def __init__(self, connection=None): + self.interface_id = None + self.allocation_id = None + self.ip_public = None + self.ip_private = None + + def __repr__(self): + return 'NatGatewayAddress:%s' % self.interface_id + + def startElement(self, name, attrs, connection): + return None + + def endElement(self, name, value, connection): + if name == 'networkInterfaceId': + self.interface_id = value + elif name == 'publicIp': + self.ip_public = value + elif name == 'allocationId': + self.allocation_id = value + elif name == 'privateIp': + self.ip_private = value Index: boto-2.40.0/docs/source/ref/vpc.rst =================================================================== --- boto-2.40.0.orig/docs/source/ref/vpc.rst +++ boto-2.40.0/docs/source/ref/vpc.rst @@ -32,6 +32,13 @@ boto.vpc.internetgateway :members: :undoc-members: +boto.vpc.natgateway +------------------- + +.. automodule:: boto.vpc.natgateway + :members: + :undoc-members: + boto.vpc.routetable ------------------- Index: boto-2.40.0/tests/unit/vpc/test_natgateway.py =================================================================== --- /dev/null +++ boto-2.40.0/tests/unit/vpc/test_natgateway.py @@ -0,0 +1,113 @@ +from tests.unit import unittest +from tests.unit import AWSMockServiceTestCase + +from boto.vpc import VPCConnection, NatGateway + + +class TestDescribeNatGateway(AWSMockServiceTestCase): + + connection_class = VPCConnection + + def default_body(self): + return b""" + <DescribeNatGatewaysResponse xmlns="http://ec2.amazonaws.com/doc/2015-10-01/"> + <requestId>bfed02c6-dae9-47c0-86a2-example</requestId> + <natGatewaySet> + <item> + <subnetId>subnet-1a2a3a4a</subnetId> + <natGatewayAddressSet> + <item> + <networkInterfaceId>eni-00e37850</networkInterfaceId> + <publicIp>198.18.125.129</publicIp> + <allocationId>eipalloc-37fc1a52</allocationId> + <privateIp>10.0.2.147</privateIp> + </item> + </natGatewayAddressSet> + <createTime>2015-11-25T14:00:55.416Z</createTime> + <vpcId>vpc-4e20d42b</vpcId> + <natGatewayId>nat-04e77a5e9c34432f9</natGatewayId> + <state>available</state> + </item> + </natGatewaySet> + </DescribeNatGatewaysResponse> + """ + + def test_describe_nat_gateway(self): + self.set_http_response(status_code=200) + api_response = self.service_connection.get_all_nat_gateways( + 'nat-04e77a5e9c34432f9', filters=[('natGatewayAddress.allocationId', ['eipalloc-37fc1a52'])]) + self.assert_request_parameters({ + 'Action': 'DescribeNatGateways', + 'NatGatewayId.1': 'nat-04e77a5e9c34432f9', + 'Filter.1.Name': 'natGatewayAddress.allocationId', + 'Filter.1.Value.1': 'eipalloc-37fc1a52'}, + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', + 'SignatureVersion', 'Timestamp', + 'Version']) + self.assertEquals(len(api_response), 1) + self.assertIsInstance(api_response[0], NatGateway) + self.assertEqual(api_response[0].id, 'nat-04e77a5e9c34432f9') + + +class TestCreateNatGateway(AWSMockServiceTestCase): + + connection_class = VPCConnection + + def default_body(self): + return b""" + <CreateNatGatewayResponse xmlns="http://ec2.amazonaws.com/doc/2015-10-01/"> + <requestId>1b74dc5c-bcda-403f-867d-example</requestId> + <natGateway> + <subnetId>subnet-1a2b3c4d</subnetId> + <natGatewayAddressSet> + <item> + <allocationId>eipalloc-37fc1a52</allocationId> + </item> + </natGatewayAddressSet> + <createTime>2015-11-25T14:00:55.416Z</createTime> + <vpcId>vpc-4e20d42b</vpcId> + <natGatewayId>nat-04e77a5e9c34432f9</natGatewayId> + <state>pending</state> + </natGateway> + </CreateNatGatewayResponse> + """ + + def test_create_nat_gateway(self): + self.set_http_response(status_code=200) + api_response = self.service_connection.create_nat_gateway('subnet-1a2b3c4d', 'eipalloc-37fc1a52') + self.assert_request_parameters({ + 'Action': 'CreateNatGateway', + 'SubnetId': 'subnet-1a2b3c4d', + 'AllocationId': 'eipalloc-37fc1a52'}, + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', + 'SignatureVersion', 'Timestamp', + 'Version']) + self.assertIsInstance(api_response, NatGateway) + self.assertEqual(api_response.id, 'nat-04e77a5e9c34432f9') + + +class TestDeleteNatGateway(AWSMockServiceTestCase): + + connection_class = VPCConnection + + def default_body(self): + return b""" + <DeleteNatGatewayResponse xmlns="http://ec2.amazonaws.com/doc/2015-10-01/"> + <requestId>741fc8ab-6ebe-452b-b92b-example</requestId> + <natGatewayId>nat-04ae55e711cec5680</natGatewayId> + </DeleteNatGatewayResponse> + """ + + def test_delete_nat_gateway(self): + self.set_http_response(status_code=200) + api_response = self.service_connection.delete_nat_gateway('nat-04ae55e711cec5680') + self.assert_request_parameters({ + 'Action': 'DeleteNatGateway', + 'NatGatewayId': 'nat-04ae55e711cec5680'}, + ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', + 'SignatureVersion', 'Timestamp', + 'Version']) + self.assertEquals(api_response, True) + +if __name__ == '__main__': + unittest.main()
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2