From a90dbd01230250664d1174b3987fc483dab4d6f0 Mon Sep 17 00:00:00 2001 From: dcarrillo Date: Sat, 8 Jun 2024 13:43:41 +0200 Subject: [PATCH] chore: Use black formatter (#9) --- .github/workflows/main.yml | 8 +- digaws/__init__.py | 4 +- digaws/digaws.py | 191 +++++++++++++-------------- noxfile.py | 31 ++--- setup.py | 38 +++--- tests/__init__.py | 94 ++++++------- tests/test_dig_aws.py | 71 ++++++---- tests/test_get_aws_ip_ranges_json.py | 38 +++--- 8 files changed, 240 insertions(+), 235 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 5c03d07..f0a04ba 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v4 @@ -43,15 +43,15 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up Python 3.11 + - name: Set up Python 3.12 uses: actions/setup-python@v5 with: - python-version: 3.11 + python-version: 3.12 - name: Install tools run: | python -m pip install --upgrade pip - pip install twine wheel + pip install twine wheel setuptools - name: Build run: | diff --git a/digaws/__init__.py b/digaws/__init__.py index daa8043..a591d6f 100644 --- a/digaws/__init__.py +++ b/digaws/__init__.py @@ -1,2 +1,2 @@ -__version__ = '1.0.6' -__description__ = 'Look up canonical information for AWS IP addresses and networks' +__version__ = "1.0.7" +__description__ = "Look up canonical information for AWS IP addresses and networks" diff --git a/digaws/digaws.py b/digaws/digaws.py index e491892..4541802 100755 --- a/digaws/digaws.py +++ b/digaws/digaws.py @@ -9,23 +9,21 @@ from datetime import datetime from pathlib import Path from typing import Any, Dict, List -from dateutil import tz - import requests +from dateutil import tz from requests.exceptions import RequestException -from . import __description__ -from . import __version__ +from . import __description__, __version__ -AWS_IP_RANGES_URL = 'https://ip-ranges.amazonaws.com/ip-ranges.json' -CACHE_DIR = Path(Path.home() / '.digaws') -CACHE_FILE = CACHE_DIR / 'ip-ranges.json' -OUTPUT_FIELDS = ['prefix', 'region', 'service', 'network_border_group'] +AWS_IP_RANGES_URL = "https://ip-ranges.amazonaws.com/ip-ranges.json" +CACHE_DIR = Path(Path.home() / ".digaws") +CACHE_FILE = CACHE_DIR / "ip-ranges.json" +OUTPUT_FIELDS = ["prefix", "region", "service", "network_border_group"] logger = logging.getLogger() handler = logging.StreamHandler(sys.stderr) logger.addHandler(handler) -handler.setFormatter(logging.Formatter('-- %(levelname)s -- %(message)s')) +handler.setFormatter(logging.Formatter("-- %(levelname)s -- %(message)s")) logger.setLevel(logging.INFO) @@ -34,51 +32,49 @@ def get_aws_ip_ranges() -> Dict: headers = {} try: - file_time = datetime.fromtimestamp( - CACHE_FILE.stat().st_mtime, - tz=tz.UTC).strftime('%a, %d %b %Y %H:%M:%S GMT') - logger.debug(f'cached file modification time: {file_time}') - headers = {'If-Modified-Since': file_time} + file_time = datetime.fromtimestamp(CACHE_FILE.stat().st_mtime, tz=tz.UTC).strftime( + "%a, %d %b %Y %H:%M:%S GMT" + ) + logger.debug(f"cached file modification time: {file_time}") + headers = {"If-Modified-Since": file_time} except FileNotFoundError as e: - logger.debug(f'Not found: {CACHE_FILE}: {e}') + logger.debug(f"Not found: {CACHE_FILE}: {e}") pass try: - response = requests.get( - url=AWS_IP_RANGES_URL, - timeout=5, - headers=headers - ) + response = requests.get(url=AWS_IP_RANGES_URL, timeout=5, headers=headers) if response.status_code == 304: try: - logger.debug(f'reading cached file {CACHE_FILE}') + logger.debug(f"reading cached file {CACHE_FILE}") with open(CACHE_FILE) as ip_ranges: return json.load(ip_ranges) except (OSError, IOError, json.JSONDecodeError) as e: - logger.debug(f'ERROR reading {CACHE_FILE}: {e}') + logger.debug(f"ERROR reading {CACHE_FILE}: {e}") raise CachedFileException(str(e)) elif response.status_code == 200: try: - with open(CACHE_FILE, 'w') as f: + with open(CACHE_FILE, "w") as f: f.write(response.text) except (OSError, IOError) as e: logger.warning(e) return response.json() else: - msg = f'Unexpected response from {AWS_IP_RANGES_URL}. Status code: ' \ - f'{response.status_code}. Content: {response.text}' + msg = ( + f"Unexpected response from {AWS_IP_RANGES_URL}. Status code: " + f"{response.status_code}. Content: {response.text}" + ) logger.debug(msg) raise UnexpectedRequestException(msg) except RequestException as e: - logger.debug(f'ERROR retrieving {AWS_IP_RANGES_URL}: {e}') + logger.debug(f"ERROR retrieving {AWS_IP_RANGES_URL}: {e}") raise e class CachedFileException(Exception): def __init__(self, message: str): - message = f'Error reading cached ranges {CACHE_FILE}: {message}' + message = f"Error reading cached ranges {CACHE_FILE}: {message}" super(CachedFileException, self).__init__(message) @@ -94,141 +90,137 @@ class DigAWSPrettyPrinter: def plain_print(self) -> None: for prefix in self.data: - if 'prefix' in self.output_fields: + if "prefix" in self.output_fields: try: print(f'Prefix: {prefix["ip_prefix"]}') except KeyError: print(f'IPv6 Prefix: {prefix["ipv6_prefix"]}') - if 'region' in self.output_fields: + if "region" in self.output_fields: print(f'Region: {prefix["region"]}') - if 'service' in self.output_fields: + if "service" in self.output_fields: print(f'Service: {prefix["service"]}') - if 'network_border_group' in self.output_fields: + if "network_border_group" in self.output_fields: print(f'Network border group: {prefix["network_border_group"]}') - print('') + print("") def json_print(self) -> None: data = [] for prefix in self.data: try: - prefix['ip_prefix'] - prefix_type = 'ip_prefix' + prefix["ip_prefix"] + prefix_type = "ip_prefix" except KeyError: - prefix_type = 'ipv6_prefix' + prefix_type = "ipv6_prefix" item_dict = {} - if 'prefix' in self.output_fields: + if "prefix" in self.output_fields: item_dict.update({prefix_type: str(prefix[prefix_type])}) - if 'region' in self.output_fields: - item_dict.update({'region': prefix['region']}) - if 'service' in self.output_fields: - item_dict.update({'service': prefix['service']}) - if 'network_border_group' in self.output_fields: - item_dict.update({'network_border_group': prefix['network_border_group']}) + if "region" in self.output_fields: + item_dict.update({"region": prefix["region"]}) + if "service" in self.output_fields: + item_dict.update({"service": prefix["service"]}) + if "network_border_group" in self.output_fields: + item_dict.update({"network_border_group": prefix["network_border_group"]}) data.append(item_dict) print(json.dumps(data, indent=2)) -class DigAWS(): - def __init__(self, *, ip_ranges: Dict, output: str = 'plain', output_fields: List[str] = []): +class DigAWS: + def __init__(self, *, ip_ranges: Dict, output: str = "plain", output_fields: List[str] = []): self.output = output self.output_fields = output_fields self.ip_prefixes = [ { - 'ip_prefix': ipaddress.IPv4Network(prefix['ip_prefix']), - 'region': prefix['region'], - 'service': prefix['service'], - 'network_border_group': prefix['network_border_group'] + "ip_prefix": ipaddress.IPv4Network(prefix["ip_prefix"]), + "region": prefix["region"], + "service": prefix["service"], + "network_border_group": prefix["network_border_group"], } - for prefix in ip_ranges['prefixes'] + for prefix in ip_ranges["prefixes"] ] self.ipv6_prefixes = [ { - 'ipv6_prefix': ipaddress.IPv6Network(prefix['ipv6_prefix']), - 'region': prefix['region'], - 'service': prefix['service'], - 'network_border_group': prefix['network_border_group'] + "ipv6_prefix": ipaddress.IPv6Network(prefix["ipv6_prefix"]), + "region": prefix["region"], + "service": prefix["service"], + "network_border_group": prefix["network_border_group"], } - for prefix in ip_ranges['ipv6_prefixes'] + for prefix in ip_ranges["ipv6_prefixes"] ] def lookup(self, address: str) -> DigAWSPrettyPrinter: - return DigAWSPrettyPrinter( - self._lookup_data(address), - self.output_fields - ) + return DigAWSPrettyPrinter(self._lookup_data(address), self.output_fields) def _lookup_data(self, address: str) -> List[Dict]: addr: Any = None try: addr = ipaddress.IPv4Address(address) - data = [prefix for prefix in self.ip_prefixes - if addr in prefix['ip_prefix']] + data = [prefix for prefix in self.ip_prefixes if addr in prefix["ip_prefix"]] except ipaddress.AddressValueError: try: addr = ipaddress.IPv6Address(address) - data = [prefix for prefix in self.ipv6_prefixes - if addr in prefix['ipv6_prefix']] + data = [prefix for prefix in self.ipv6_prefixes if addr in prefix["ipv6_prefix"]] except ipaddress.AddressValueError: try: addr = ipaddress.IPv4Network(address) - data = [prefix for prefix in self.ip_prefixes - if addr.subnet_of(prefix['ip_prefix'])] + data = [ + prefix for prefix in self.ip_prefixes if addr.subnet_of(prefix["ip_prefix"]) + ] except (ipaddress.AddressValueError, ValueError): try: addr = ipaddress.IPv6Network(address) - data = [prefix for prefix in self.ipv6_prefixes - if addr.subnet_of(prefix['ipv6_prefix'])] + data = [ + prefix + for prefix in self.ipv6_prefixes + if addr.subnet_of(prefix["ipv6_prefix"]) + ] except (ipaddress.AddressValueError, ValueError): - raise ValueError(f'Wrong IP or CIDR format: {address}') + raise ValueError(f"Wrong IP or CIDR format: {address}") return data def arguments_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - add_help=True, - description=__description__ - ) + parser = argparse.ArgumentParser(add_help=True, description=__description__) parser.add_argument( - '--output', - metavar='', - choices=['plain', 'json'], + "--output", + metavar="", + choices=["plain", "json"], type=str, required=False, - dest='output', - default='plain', - help='Formatting style for command output, by default %(default)s' + dest="output", + default="plain", + help="Formatting style for command output, by default %(default)s", ) parser.add_argument( - '--output-fields', - nargs='*', + "--output-fields", + nargs="*", choices=OUTPUT_FIELDS, required=False, - dest='output_fields', + dest="output_fields", default=OUTPUT_FIELDS, - help='Print only the given fields' + help="Print only the given fields", ) parser.add_argument( - '--debug', - action='store_true', + "--debug", + action="store_true", required=False, default=False, - dest='debug', - help='Enable debug' + dest="debug", + help="Enable debug", ) parser.add_argument( - '--version', - action='version', - version='%(prog)s {version}'.format(version=__version__) + "--version", + action="version", + version="%(prog)s {version}".format(version=__version__), ) parser.add_argument( - 'addresses', - nargs='+', - metavar='', + "addresses", + nargs="+", + metavar="", type=str, - help='CIDR or IP (v4 or v6) to look up' + help="CIDR or IP (v4 or v6) to look up", ) return parser @@ -248,7 +240,7 @@ def main(): for address in args.addresses: responses.append(dig.lookup(address)) - if args.output == 'plain': + if args.output == "plain": for response in responses: response.plain_print() else: @@ -261,12 +253,13 @@ def main(): DigAWSPrettyPrinter(joined, args.output_fields).json_print() except ( - RequestException, - ipaddress.AddressValueError, - ValueError, - CachedFileException, - UnexpectedRequestException) as e: - print(f'ERROR: {e}') + RequestException, + ipaddress.AddressValueError, + ValueError, + CachedFileException, + UnexpectedRequestException, + ) as e: + print(f"ERROR: {e}") sys.exit(1) diff --git a/noxfile.py b/noxfile.py index 22e1994..bb04cb0 100644 --- a/noxfile.py +++ b/noxfile.py @@ -1,34 +1,35 @@ import nox +nox.options.sessions = ["lint", "typing", "tests"] +locations = ["noxfile.py", "setup.py", "digaws/", "tests/"] -nox.options.sessions = ['lint', 'typing', 'tests'] -locations = ['noxfile.py', 'setup.py', 'digaws/', 'tests/'] +lint_common_args = ["--max-line-length", "100"] +black_args = ["--line-length", "100"] +mypy_args = ["--ignore-missing-imports", "--install-types", "--non-interactive"] -lint_common_args = ['--max-line-length', '120'] -mypy_args = ['--ignore-missing-imports', '--install-types', '--non-interactive'] - -pytest_args = ['--cov=digaws', '--cov-report=', 'tests/'] -coverage_args = ['report', '--show-missing', '--fail-under=80'] +pytest_args = ["--cov=digaws", "--cov-report=", "tests/"] +coverage_args = ["report", "--show-missing", "--fail-under=80"] @nox.session() def lint(session): args = session.posargs or locations - session.install('pycodestyle', 'flake8', 'flake8-import-order') - session.run('pycodestyle', *(lint_common_args + args)) - session.run('flake8', *(lint_common_args + args)) + session.install("pycodestyle", "flake8", "black") + session.run("pycodestyle", *(lint_common_args + args)) + session.run("flake8", *(lint_common_args + args)) + session.run("black", "--check", *(black_args + args)) @nox.session() def typing(session): args = session.posargs or locations - session.install('mypy') - session.run('mypy', *(mypy_args + args)) + session.install("mypy") + session.run("mypy", *(mypy_args + args)) @nox.session() def tests(session): args = session.posargs - session.install('-r', 'requirements_test.txt') - session.run('pytest', *(pytest_args + args)) - session.run('coverage', *coverage_args) + session.install("-r", "requirements_test.txt") + session.run("pytest", *(pytest_args + args)) + session.run("coverage", *coverage_args) diff --git a/setup.py b/setup.py index 2c2e876..88a6a6d 100644 --- a/setup.py +++ b/setup.py @@ -1,36 +1,34 @@ -from digaws import __description__, __version__ - from setuptools import setup +from digaws import __description__, __version__ + def get_long_description() -> str: - with open('README.md', 'r', encoding='utf-8') as fh: + with open("README.md", "r", encoding="utf-8") as fh: return fh.read() setup( - name='digaws', + name="digaws", version=__version__, description=__description__, long_description=get_long_description(), - long_description_content_type='text/markdown', - url='http://github.com/dcarrillo/digaws', - author='Daniel Carrillo', - author_email='daniel.carrillo@gmail.com', - license='Apache Software License', - packages=['digaws'], + long_description_content_type="text/markdown", + url="http://github.com/dcarrillo/digaws", + author="Daniel Carrillo", + author_email="daniel.carrillo@gmail.com", + license="Apache Software License", + packages=["digaws"], zip_safe=False, classifiers=[ - 'Programming Language :: Python :: 3', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: OS Independent', + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", ], - python_requires='>=3.8', - entry_points={ - 'console_scripts': ['digaws=digaws.digaws:main'] - }, + python_requires=">=3.8", + entry_points={"console_scripts": ["digaws=digaws.digaws:main"]}, install_requires=[ - 'python-dateutil~=2.8', - 'requests~=2.25', - ] + "python-dateutil~=2.8", + "requests~=2.25", + ], ) diff --git a/tests/__init__.py b/tests/__init__.py index 5485a04..526784c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,6 +1,6 @@ import ipaddress -AWS_IP_RANGES = ''' +AWS_IP_RANGES = """ { "syncToken": "1608245058", "createDate": "2020-12-17-22-44-18", @@ -45,57 +45,57 @@ AWS_IP_RANGES = ''' } ] } -''' +""" AWS_IPV4_RANGES_OBJ = [ - { - 'ip_prefix': ipaddress.IPv4Network('52.93.178.234/32'), - 'region': 'us-west-1', - 'service': 'AMAZON', - 'network_border_group': 'us-west-1' - }, - { - 'ip_prefix': ipaddress.IPv4Network('52.94.76.0/22'), - 'region': 'us-west-2', - 'service': 'AMAZON', - 'network_border_group': 'us-west-2' - } + { + "ip_prefix": ipaddress.IPv4Network("52.93.178.234/32"), + "region": "us-west-1", + "service": "AMAZON", + "network_border_group": "us-west-1", + }, + { + "ip_prefix": ipaddress.IPv4Network("52.94.76.0/22"), + "region": "us-west-2", + "service": "AMAZON", + "network_border_group": "us-west-2", + }, ] AWS_IPV6_RANGES_OBJ = [ - { - 'ipv6_prefix': ipaddress.IPv6Network('2600:1f00:c000::/40'), - 'region': 'us-west-1', - 'service': 'AMAZON', - 'network_border_group': 'us-west-1' - }, - { - 'ipv6_prefix': ipaddress.IPv6Network('2600:1f01:4874::/47'), - 'region': 'us-west-2', - 'service': 'AMAZON', - 'network_border_group': 'us-west-2' - }, - { - 'ipv6_prefix': ipaddress.IPv6Network('2600:1f14:fff:f800::/53'), - 'region': 'us-west-2', - 'service': 'ROUTE53_HEALTHCHECKS', - 'network_border_group': 'us-west-2' - }, - { - 'ipv6_prefix': ipaddress.IPv6Network('2600:1f14::/35'), - 'region': 'us-west-2', - 'service': 'EC2', - 'network_border_group': 'us-west-2' - } + { + "ipv6_prefix": ipaddress.IPv6Network("2600:1f00:c000::/40"), + "region": "us-west-1", + "service": "AMAZON", + "network_border_group": "us-west-1", + }, + { + "ipv6_prefix": ipaddress.IPv6Network("2600:1f01:4874::/47"), + "region": "us-west-2", + "service": "AMAZON", + "network_border_group": "us-west-2", + }, + { + "ipv6_prefix": ipaddress.IPv6Network("2600:1f14:fff:f800::/53"), + "region": "us-west-2", + "service": "ROUTE53_HEALTHCHECKS", + "network_border_group": "us-west-2", + }, + { + "ipv6_prefix": ipaddress.IPv6Network("2600:1f14::/35"), + "region": "us-west-2", + "service": "EC2", + "network_border_group": "us-west-2", + }, ] -LAST_MODIFIED_TIME = 'Thu, 17 Dec 2020 23:22:33 GMT' +LAST_MODIFIED_TIME = "Thu, 17 Dec 2020 23:22:33 GMT" -RESPONSE_PLAIN_PRINT = '''Prefix: 52.94.76.0/22 +RESPONSE_PLAIN_PRINT = """Prefix: 52.94.76.0/22 Region: us-west-2 Service: AMAZON Network border group: us-west-2 -''' +""" -RESPONSE_JSON_PRINT = '''[ +RESPONSE_JSON_PRINT = """[ { "ipv6_prefix": "2600:1f14:fff:f800::/53", "region": "us-west-2", @@ -109,9 +109,9 @@ RESPONSE_JSON_PRINT = '''[ "network_border_group": "us-west-2" } ] -''' +""" -RESPONSE_JSON_FIELDS_PRINT = '''[ +RESPONSE_JSON_FIELDS_PRINT = """[ { "service": "ROUTE53_HEALTHCHECKS", "network_border_group": "us-west-2" @@ -121,9 +121,9 @@ RESPONSE_JSON_FIELDS_PRINT = '''[ "network_border_group": "us-west-2" } ] -''' +""" -RESPONSE_JSON_JOINED_PRINT = '''[ +RESPONSE_JSON_JOINED_PRINT = """[ { "ip_prefix": "52.94.76.0/22", "region": "us-west-2", @@ -143,4 +143,4 @@ RESPONSE_JSON_JOINED_PRINT = '''[ "network_border_group": "us-west-2" } ] -''' +""" diff --git a/tests/test_dig_aws.py b/tests/test_dig_aws.py index 8bf9588..87d0563 100644 --- a/tests/test_dig_aws.py +++ b/tests/test_dig_aws.py @@ -1,24 +1,22 @@ import json import sys -import digaws.digaws as digaws -from digaws import __description__, __version__ - import pytest +import digaws.digaws as digaws import tests +from digaws import __description__, __version__ @pytest.fixture def test_dig(): - return digaws.DigAWS(ip_ranges=json.loads( - tests.AWS_IP_RANGES), - output_fields=digaws.OUTPUT_FIELDS + return digaws.DigAWS( + ip_ranges=json.loads(tests.AWS_IP_RANGES), output_fields=digaws.OUTPUT_FIELDS ) def test_cli(capsys): - sys.argv = ['digaws', '-h'] + sys.argv = ["digaws", "-h"] try: digaws.main() except SystemExit as e: @@ -28,19 +26,24 @@ def test_cli(capsys): def test_cli_version(capsys, mocker): - sys.argv = ['digaws', '--version'] + sys.argv = ["digaws", "--version"] try: digaws.main() except SystemExit as e: out, _ = capsys.readouterr() - assert out == f'digaws {__version__}\n' + assert out == f"digaws {__version__}\n" assert e.code == 0 def test_cli_invocation(capsys, mocker): - sys.argv = ['digaws', '52.94.76.0/22', '2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8', - '--output', 'json'] - mocker.patch('digaws.digaws.get_aws_ip_ranges', return_value=json.loads(tests.AWS_IP_RANGES)) + sys.argv = [ + "digaws", + "52.94.76.0/22", + "2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8", + "--output", + "json", + ] + mocker.patch("digaws.digaws.get_aws_ip_ranges", return_value=json.loads(tests.AWS_IP_RANGES)) digaws.main() out, _ = capsys.readouterr() @@ -48,18 +51,30 @@ def test_cli_invocation(capsys, mocker): def test_cli_output_plain_fields_invocation(capsys, mocker): - sys.argv = ['digaws', '52.94.76.0/22', '--output=plain', '--output-fields', 'region'] - mocker.patch('digaws.digaws.get_aws_ip_ranges', return_value=json.loads(tests.AWS_IP_RANGES)) + sys.argv = [ + "digaws", + "52.94.76.0/22", + "--output=plain", + "--output-fields", + "region", + ] + mocker.patch("digaws.digaws.get_aws_ip_ranges", return_value=json.loads(tests.AWS_IP_RANGES)) digaws.main() out, _ = capsys.readouterr() - assert out == 'Region: us-west-2\n\n' + assert out == "Region: us-west-2\n\n" def test_cli_output_json_fields_invocation(capsys, mocker): - sys.argv = ['digaws', '2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8', '--output=json', - '--output-fields', 'service', 'network_border_group'] - mocker.patch('digaws.digaws.get_aws_ip_ranges', return_value=json.loads(tests.AWS_IP_RANGES)) + sys.argv = [ + "digaws", + "2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8", + "--output=json", + "--output-fields", + "service", + "network_border_group", + ] + mocker.patch("digaws.digaws.get_aws_ip_ranges", return_value=json.loads(tests.AWS_IP_RANGES)) digaws.main() out, _ = capsys.readouterr() @@ -72,28 +87,28 @@ def test_dig_aws_construct(test_dig): def test_lookup(test_dig): - assert str(test_dig._lookup_data('52.94.76.1')[0]['ip_prefix']) == '52.94.76.0/22' - assert str(test_dig._lookup_data('52.94.76.0/24')[0]['ip_prefix']) == '52.94.76.0/22' + assert str(test_dig._lookup_data("52.94.76.1")[0]["ip_prefix"]) == "52.94.76.0/22" + assert str(test_dig._lookup_data("52.94.76.0/24")[0]["ip_prefix"]) == "52.94.76.0/22" - input = '2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8' - assert str(test_dig._lookup_data(input)[0]['ipv6_prefix']) == '2600:1f14:fff:f800::/53' - assert str(test_dig._lookup_data(input)[1]['ipv6_prefix']) == '2600:1f14::/35' - assert str(test_dig._lookup_data('2600:1f14::/36')[0]['ipv6_prefix']) == '2600:1f14::/35' + input = "2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8" + assert str(test_dig._lookup_data(input)[0]["ipv6_prefix"]) == "2600:1f14:fff:f800::/53" + assert str(test_dig._lookup_data(input)[1]["ipv6_prefix"]) == "2600:1f14::/35" + assert str(test_dig._lookup_data("2600:1f14::/36")[0]["ipv6_prefix"]) == "2600:1f14::/35" with pytest.raises(ValueError) as e: - test_dig.lookup('what are you talking about') - assert e.startswith('Wrong IP or CIDR format') + test_dig.lookup("what are you talking about") + assert e.startswith("Wrong IP or CIDR format") def test_response_plain_print(test_dig, capsys): - test_dig.lookup('52.94.76.0/22').plain_print() + test_dig.lookup("52.94.76.0/22").plain_print() out, _ = capsys.readouterr() assert out == tests.RESPONSE_PLAIN_PRINT def test_response_json_print(test_dig, capsys): - test_dig.lookup('2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8').json_print() + test_dig.lookup("2600:1f14:fff:f810:a1c1:f507:a2d1:2dd8").json_print() out, _ = capsys.readouterr() assert out == tests.RESPONSE_JSON_PRINT diff --git a/tests/test_get_aws_ip_ranges_json.py b/tests/test_get_aws_ip_ranges_json.py index b6a919c..76ccefa 100644 --- a/tests/test_get_aws_ip_ranges_json.py +++ b/tests/test_get_aws_ip_ranges_json.py @@ -1,12 +1,10 @@ import json import os -import digaws.digaws as digaws - import pytest - import requests +import digaws.digaws as digaws import tests @@ -28,61 +26,61 @@ def create_cache_dir(fs): digaws.CACHE_DIR.mkdir(parents=True) -@pytest.mark.parametrize('fs', [[None, [digaws]]], indirect=True) +@pytest.mark.parametrize("fs", [[None, [digaws]]], indirect=True) def test_get_aws_ip_ranges_cached_valid_file(mocker, fs, create_cache_dir) -> None: - with open(digaws.CACHE_FILE, 'w') as out: + with open(digaws.CACHE_FILE, "w") as out: out.write(tests.AWS_IP_RANGES) response = requests.Response response.status_code = 304 - mocker.patch('requests.get', return_value=response) + mocker.patch("requests.get", return_value=response) result = digaws.get_aws_ip_ranges() - assert result['syncToken'] == '1608245058' + assert result["syncToken"] == "1608245058" -@pytest.mark.parametrize('fs', [[None, [digaws]]], indirect=True) +@pytest.mark.parametrize("fs", [[None, [digaws]]], indirect=True) def test_get_aws_ip_ranges_cached_invalid_file(mocker, fs, create_cache_dir) -> None: - with open(digaws.CACHE_FILE, 'w'): + with open(digaws.CACHE_FILE, "w"): pass response = requests.Response response.status_code = 304 - mocker.patch('requests.get', return_value=response) + mocker.patch("requests.get", return_value=response) with pytest.raises(digaws.CachedFileException): digaws.get_aws_ip_ranges() -@pytest.mark.parametrize('fs', [[None, [digaws]]], indirect=True) +@pytest.mark.parametrize("fs", [[None, [digaws]]], indirect=True) def test_get_aws_ip_ranges_cached_deprecated_file(monkeypatch, fs, create_cache_dir) -> None: - with open(digaws.CACHE_FILE, 'w'): + with open(digaws.CACHE_FILE, "w"): pass digaws.CACHE_FILE.touch() os.utime(digaws.CACHE_FILE, times=(0, 0)) - monkeypatch.setattr(requests, 'get', mock_get) + monkeypatch.setattr(requests, "get", mock_get) result = digaws.get_aws_ip_ranges() - assert result['syncToken'] == '1608245058' + assert result["syncToken"] == "1608245058" -@pytest.mark.parametrize('fs', [[None, [digaws]]], indirect=True) +@pytest.mark.parametrize("fs", [[None, [digaws]]], indirect=True) def test_get_aws_ip_ranges_no_file(monkeypatch, fs, create_cache_dir) -> None: - monkeypatch.setattr(requests, 'get', mock_get) + monkeypatch.setattr(requests, "get", mock_get) result = digaws.get_aws_ip_ranges() - assert result['syncToken'] == '1608245058' + assert result["syncToken"] == "1608245058" -@pytest.mark.parametrize('fs', [[None, [digaws]]], indirect=True) +@pytest.mark.parametrize("fs", [[None, [digaws]]], indirect=True) def test_get_aws_ip_ranges_invalid_status(mocker, fs, create_cache_dir) -> None: response = requests.Response response.status_code = 301 - mocker.patch('requests.get', return_value=response) + mocker.patch("requests.get", return_value=response) with pytest.raises(digaws.UnexpectedRequestException) as e: digaws.get_aws_ip_ranges() - assert e.match('^Unexpected response from') + assert e.match("^Unexpected response from")