1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
import os
from awscli import clidriver
from awscli.autocomplete import db, generator
from awscli.autocomplete.local.indexer import ModelIndexer
from awscli.autocomplete.serverside import model
from awscli.autocomplete.serverside.indexer import APICallIndexer
from awscli.testutils import unittest
def _ddb_only_command_table(command_table, **kwargs):
for key in list(command_table):
if key != 'dynamodb':
del command_table[key]
class TestCanGenerateServerIndex(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.db_connection = db.DatabaseConnection(":memory:")
index_generator = generator.IndexGenerator(
[
ModelIndexer(cls.db_connection),
APICallIndexer(cls.db_connection),
],
)
driver = clidriver.create_clidriver()
driver.session.register(
'building-command-table.main', _ddb_only_command_table
)
index_generator.generate_index(driver)
def test_can_query_model_from_index(self):
lookup = model.DBCompletionLookup(self.db_connection)
# We'll query a few lookups to ensure that we indexed them
# correctly.
result = lookup.get_server_completion_data(
['aws', 'dynamodb'], 'delete-table', 'table-name'
)
self.assertEqual(
result,
{
'completions': [
{
'jp_expr': 'TableNames[]',
'operation': 'list_tables',
'parameters': {},
'service': 'dynamodb',
}
]
},
)
result = lookup.get_server_completion_data(
['aws', 'dynamodb'], 'update-global-table', 'global-table-name'
)
self.assertEqual(
result,
{
'completions': [
{
'jp_expr': 'GlobalTables[].GlobalTableName',
'operation': 'list_global_tables',
'parameters': {},
'service': 'dynamodb',
}
]
},
)
def test_returns_none_if_no_lookup_data_found(self):
lookup = model.DBCompletionLookup(self.db_connection)
self.assertIsNone(
lookup.get_server_completion_data(
['aws', 'dynamodb'], 'delete-table', 'unknown-param'
)
)
self.assertIsNone(
lookup.get_server_completion_data(
['aws', 'dynamodb'], 'unknown-operation', 'foo'
)
)
self.assertIsNone(
lookup.get_server_completion_data(
['aws', 'unknown-service'], 'unknown-operation', 'foo'
)
)
class TestCanHandleNoCompletionData(unittest.TestCase):
def setUp(self):
self.db_connection = db.DatabaseConnection(":memory:")
def tearDown(self):
self.db_connection.close()
def _disable_cli_loaders(self, event_name, session, **kwargs):
loader = session.get_component('data_loader')
for path in loader.search_paths[::]:
if path.endswith(os.path.join('awscli', 'botocore', 'data')):
loader.search_paths.remove(path)
def test_no_errors_when_missing_completion_data(self):
index_generator = generator.IndexGenerator(
[
ModelIndexer(self.db_connection),
APICallIndexer(self.db_connection),
],
)
driver = clidriver.create_clidriver()
# We're going to remove the CLI data path from the loader.
# This will result in the loader not being able to find any
# completion data, which allows us to verify the behavior when
# there's no completion data.
driver.session.register(
'building-command-table.main', _ddb_only_command_table
)
driver.session.register(
'building-command-table.dynamodb', self._disable_cli_loaders
)
index_generator.generate_index(driver)
# We shouldn't get any data now because we couldn't load
# completion data.
lookup = model.DBCompletionLookup(self.db_connection)
result = lookup.get_server_completion_data(
['aws', 'dynamodb'], 'delete-table', 'table-name'
)
self.assertIsNone(result)
|