Merge pull request #4 from dcarrillo/output-fields

Add --output-fields parameter
This commit is contained in:
Daniel Carrillo 2020-12-26 13:54:36 +01:00 committed by GitHub
commit d9c8738e9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 24 deletions

View File

@ -19,7 +19,7 @@ pip install digaws
## Usage ## Usage
```text ```text
usage: digaws [-h] [--output <plain|json>] [--debug] <ip address|cidr> [<ip address|cidr> ...] usage: digaws [-h] [--output <plain|json>] [--output-fields [{prefix,region,service,network_border_group} ...]] [--debug] [--version] <ip address|cidr> [<ip address|cidr> ...]
Look up canonical information for AWS IP addresses and networks Look up canonical information for AWS IP addresses and networks
@ -30,7 +30,10 @@ optional arguments:
-h, --help show this help message and exit -h, --help show this help message and exit
--output <plain|json> --output <plain|json>
Formatting style for command output, by default plain Formatting style for command output, by default plain
--output-fields [{prefix,region,service,network_border_group} ...]
Print only the given fields
--debug Enable debug --debug Enable debug
--version show program's version number and exit
``` ```
## Examples ## Examples
@ -104,3 +107,14 @@ Network border group: sa-east-1
} }
] ]
``` ```
- Choose output fields
```text
~ » digaws 34.255.166.63 --output-fields service
Service: AMAZON
Service: EC2
```

View File

@ -20,6 +20,7 @@ from . import __version__
AWS_IP_RANGES_URL = 'https://ip-ranges.amazonaws.com/ip-ranges.json' AWS_IP_RANGES_URL = 'https://ip-ranges.amazonaws.com/ip-ranges.json'
CACHE_DIR = Path(Path.home() / '.digaws') CACHE_DIR = Path(Path.home() / '.digaws')
CACHE_FILE = CACHE_DIR / 'ip-ranges.json' CACHE_FILE = CACHE_DIR / 'ip-ranges.json'
OUTPUT_FIELDS = ['prefix', 'region', 'service', 'network_border_group']
logger = logging.getLogger() logger = logging.getLogger()
handler = logging.StreamHandler(sys.stderr) handler = logging.StreamHandler(sys.stderr)
@ -87,19 +88,23 @@ class UnexpectedRequestException(Exception):
class DigAWSPrettyPrinter: class DigAWSPrettyPrinter:
def __init__(self, data: List[Dict], output_filter: List[str] = []): def __init__(self, data: List[Dict], output_fields: List[str] = []):
self.data = data self.data = data
self.output_filter = output_filter self.output_fields = output_fields
def plain_print(self) -> None: def plain_print(self) -> None:
for prefix in self.data: for prefix in self.data:
try: if 'prefix' in self.output_fields:
print(f'Prefix: {prefix["ip_prefix"]}') try:
except KeyError: print(f'Prefix: {prefix["ip_prefix"]}')
print(f'IPv6 Prefix: {prefix["ipv6_prefix"]}') except KeyError:
print(f'Region: {prefix["region"]}') print(f'IPv6 Prefix: {prefix["ipv6_prefix"]}')
print(f'Service: {prefix["service"]}') if 'region' in self.output_fields:
print(f'Network border group: {prefix["network_border_group"]}') print(f'Region: {prefix["region"]}')
if 'service' in self.output_fields:
print(f'Service: {prefix["service"]}')
if 'network_border_group' in self.output_fields:
print(f'Network border group: {prefix["network_border_group"]}')
print('') print('')
def json_print(self) -> None: def json_print(self) -> None:
@ -110,22 +115,26 @@ class DigAWSPrettyPrinter:
prefix_type = 'ip_prefix' prefix_type = 'ip_prefix'
except KeyError: except KeyError:
prefix_type = 'ipv6_prefix' prefix_type = 'ipv6_prefix'
data.append(
{ item_dict = {}
prefix_type: str(prefix[prefix_type]), if 'prefix' in self.output_fields:
'region': prefix['region'], item_dict.update({prefix_type: str(prefix[prefix_type])})
'service': prefix['service'], if 'region' in self.output_fields:
'network_border_group': prefix['network_border_group'] item_dict.update({'region': prefix['region']})
} if 'service' in self.output_fields:
) item_dict.update({'service': prefix['service']})
if 'network_border_group' in self.output_fields:
item_dict.update({'network_border_group': prefix['network_border_group']})
data.append(item_dict)
if data: if data:
print(json.dumps(data, indent=2)) print(json.dumps(data, indent=2))
class DigAWS(): class DigAWS():
def __init__(self, ip_ranges: Dict, output: str = 'plain', output_filter: List[str] = []): def __init__(self, *, ip_ranges: Dict, output: str = 'plain', output_fields: List[str] = []):
self.output = output self.output = output
self.output_filter = output_filter self.output_fields = output_fields
self.ip_prefixes = [ self.ip_prefixes = [
{ {
'ip_prefix': ipaddress.IPv4Network(prefix['ip_prefix']), 'ip_prefix': ipaddress.IPv4Network(prefix['ip_prefix']),
@ -148,7 +157,7 @@ class DigAWS():
def lookup(self, address: str) -> DigAWSPrettyPrinter: def lookup(self, address: str) -> DigAWSPrettyPrinter:
return DigAWSPrettyPrinter( return DigAWSPrettyPrinter(
self._lookup_data(address), self._lookup_data(address),
self.output_filter self.output_fields
) )
def _lookup_data(self, address: str) -> List[Dict]: def _lookup_data(self, address: str) -> List[Dict]:
@ -193,6 +202,15 @@ def arguments_parser() -> argparse.ArgumentParser:
default='plain', default='plain',
help='Formatting style for command output, by default %(default)s' help='Formatting style for command output, by default %(default)s'
) )
parser.add_argument(
'--output-fields',
nargs='*',
choices=OUTPUT_FIELDS,
required=False,
dest='output_fields',
default=OUTPUT_FIELDS,
help='Print only the given fields'
)
parser.add_argument( parser.add_argument(
'--debug', '--debug',
action='store_true', action='store_true',
@ -225,7 +243,7 @@ def main():
try: try:
ip_ranges = get_aws_ip_ranges() ip_ranges = get_aws_ip_ranges()
dig = DigAWS(ip_ranges) dig = DigAWS(ip_ranges=ip_ranges, output_fields=args.output_fields)
responses = [] responses = []
for address in args.addresses: for address in args.addresses:
@ -242,7 +260,7 @@ def main():
for response in responses: for response in responses:
joined += response.data joined += response.data
DigAWSPrettyPrinter(joined).json_print() DigAWSPrettyPrinter(joined, args.output_fields).json_print()
except ( except (
RequestException, RequestException,
ipaddress.AddressValueError, ipaddress.AddressValueError,

View File

@ -111,6 +111,18 @@ RESPONSE_JSON_PRINT = '''[
] ]
''' '''
RESPONSE_JSON_FIELDS_PRINT = '''[
{
"service": "ROUTE53_HEALTHCHECKS",
"network_border_group": "us-west-2"
},
{
"service": "EC2",
"network_border_group": "us-west-2"
}
]
'''
RESPONSE_JSON_JOINED_PRINT = '''[ RESPONSE_JSON_JOINED_PRINT = '''[
{ {
"ip_prefix": "52.94.76.0/22", "ip_prefix": "52.94.76.0/22",

View File

@ -11,7 +11,10 @@ import tests
@pytest.fixture @pytest.fixture
def test_dig(): def test_dig():
return digaws.DigAWS(json.loads(tests.AWS_IP_RANGES)) return digaws.DigAWS(ip_ranges=json.loads(
tests.AWS_IP_RANGES),
output_fields=digaws.OUTPUT_FIELDS
)
def test_cli(capsys): def test_cli(capsys):
@ -44,6 +47,25 @@ def test_cli_invocation(capsys, mocker):
assert out == tests.RESPONSE_JSON_JOINED_PRINT assert out == tests.RESPONSE_JSON_JOINED_PRINT
def test_cli_output_plain_fields_invocation(capsys, mocker):
sys.argv = ['digaws', '52.94.76.0/22', '--output=plain', '--output-fields', 'region']
mocker.patch('digaws.digaws.get_aws_ip_ranges', return_value=json.loads(tests.AWS_IP_RANGES))
digaws.main()
out, _ = capsys.readouterr()
assert out == 'Region: us-west-2\n\n'
def test_cli_output_json_fields_invocation(capsys, mocker):
sys.argv = ['digaws', '2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8', '--output=json',
'--output-fields', 'service', 'network_border_group']
mocker.patch('digaws.digaws.get_aws_ip_ranges', return_value=json.loads(tests.AWS_IP_RANGES))
digaws.main()
out, _ = capsys.readouterr()
assert out == tests.RESPONSE_JSON_FIELDS_PRINT
def test_dig_aws_construct(test_dig): def test_dig_aws_construct(test_dig):
assert test_dig.ip_prefixes == tests.AWS_IPV4_RANGES_OBJ assert test_dig.ip_prefixes == tests.AWS_IPV4_RANGES_OBJ
assert test_dig.ipv6_prefixes == tests.AWS_IPV6_RANGES_OBJ assert test_dig.ipv6_prefixes == tests.AWS_IPV6_RANGES_OBJ