Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into master-add-fabric-monitor-tables
Browse files Browse the repository at this point in the history
jfeng-arista authored Sep 5, 2023
2 parents 64e104d + dc14d36 commit b3e2a4c
Showing 2 changed files with 76 additions and 21 deletions.
7 changes: 5 additions & 2 deletions common/configdb.h
Original file line number Diff line number Diff line change
@@ -105,8 +105,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
try:
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
if table in self.handlers:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if item['data'] == 'del':
data = None
else:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if table in init_data and row in init_data[table]:
cache_hit = init_data[table][row] == data
del init_data[table][row]
90 changes: 71 additions & 19 deletions tests/test_redis_ut.py
Original file line number Diff line number Diff line change
@@ -634,46 +634,98 @@ def thread_coming_entry():
def test_ConfigDBInit():
table_name_1 = 'TEST_TABLE_1'
table_name_2 = 'TEST_TABLE_2'
table_name_3 = 'TEST_TABLE_3'
test_key = 'key1'
test_data = {'field1': 'value1'}
test_data_update = {'field1': 'value2'}
test_data = {'field1': 'value1', 'field2': 'value2'}

queue = multiprocessing.Queue()

manager = multiprocessing.Manager()
ret_data = manager.dict()

def test_handler(table, key, data, ret):
ret[table] = {key: data}

def test_init_handler(data, ret):
def test_handler(table, key, data, ret, q=None):
if table not in ret:
ret[table] = {}
if data is None:
ret[table] = {k: v for k, v in ret[table].items() if k != key}
if q:
q.put(ret[table])
elif key not in ret[table] or ret[table][key] != data:
ret[table] = {key: data}
if q:
q.put(ret[table])

def test_init_handler(data, ret, queue):
ret.update(data)
queue.put(ret)

def thread_listen(ret):
def thread_listen(ret, queue):
config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)

config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret),
config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=False)
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret),
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=True)
config_db.subscribe(table_name_3, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=False)

config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret))
config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret, queue))

config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)
client = config_db.get_redis_client(config_db.CONFIG_DB)
client.flushdb()

# Init table data
config_db.set_entry(table_name_1, test_key, test_data)
config_db.set_entry(table_name_2, test_key, test_data)
# Prepare unique data per each table to track if correct data are received in the update
table_1_data = {f'{table_name_1}_{k}': v for k, v in test_data.items()}
config_db.set_entry(table_name_1, test_key, table_1_data)
table_2_data = {f'{table_name_2}_{k}': v for k, v in test_data.items()}
config_db.set_entry(table_name_2, test_key, table_2_data)
config_db.set_entry(table_name_3, test_key, {})

thread = multiprocessing.Process(target=thread_listen, args=(ret_data,))
thread.start()
time.sleep(5)
thread.terminate()
# Run the listener in a separate process. It is not possible to stop a listener when it is running as a thread.
# When it runs in a separate process we can terminate it with a signal.
listener = multiprocessing.Process(target=thread_listen, args=(ret_data, queue))
listener.start()

assert ret_data[table_name_1] == {test_key: test_data}
assert ret_data[table_name_2] == {test_key: test_data}
try:
# During the subscription to table 2 'fire_init_data=True' is passed. The callback should be called before the listener.
# Verify that callback is fired before listener initialization.
data = queue.get(timeout=5)
assert data == {test_key: table_2_data}

# Wait for init data
init_data = queue.get(timeout=5)

# Verify that all tables initialized correctly
assert init_data[table_name_1] == {test_key: table_1_data}
assert init_data[table_name_2] == {test_key: table_2_data}
assert init_data[table_name_3] == {test_key: {}}

# Remove one key-value pair from the data. Verify that the entry is updated correctly
table_1_data.popitem()
config_db.set_entry(table_name_1, test_key, table_1_data)
data = queue.get(timeout=5)
assert data == {test_key: table_1_data}

# Remove all key-value pairs. Verify that the table still contains key
config_db.set_entry(table_name_1, test_key, {})
data = queue.get(timeout=5)
assert data == {test_key: {}}

# Remove the key
config_db.set_entry(table_name_1, test_key, None)
data = queue.get(timeout=5)
assert test_key not in data

# Remove the entry (with no attributes) from the table.
# Verify that the update is received and a callback is called
config_db.set_entry(table_name_3, test_key, None)
table_3_data = queue.get(timeout=5)
assert test_key not in table_3_data
finally:
listener.terminate()


def test_DBConnectFailure():

0 comments on commit b3e2a4c

Please sign in to comment.