Skip to content

Commit

Permalink
Added AES 256 test cases and improved test helpers.
Browse files Browse the repository at this point in the history
  • Loading branch information
lextm committed Sep 9, 2024
1 parent 44d0124 commit 96676f9
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 2 deletions.
12 changes: 12 additions & 0 deletions pysnmp/entity/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pysnmp.proto.mpmod.rfc3412 import SnmpV3MessageProcessingModel
from pysnmp.proto.rfc1902 import OctetString
from pysnmp.proto.rfc3412 import MsgAndPduDispatcher
from pysnmp.proto.secmod.base import AbstractSecurityModel
from pysnmp.proto.secmod.rfc2576 import SnmpV1SecurityModel, SnmpV2cSecurityModel
from pysnmp.proto.secmod.rfc3414 import SnmpUSMSecurityModel

Expand Down Expand Up @@ -65,6 +66,7 @@ class SnmpEngine:
msgAndPduDsp: MsgAndPduDispatcher
snmpEngineId: OctetString
cache: Dict[str, Any]
securityModels: Dict[int, AbstractSecurityModel]

def __init__(
self,
Expand Down Expand Up @@ -177,6 +179,16 @@ def __init__(
def __repr__(self):
return f"{self.__class__.__name__}(snmpEngineID={self.snmpEngineID!r})"

def _close(self):
"""
Close the SNMP engine to test memory leak.
This method is intended for unit testing purposes only.
It closes the SNMP engine and checks if all associated resources are released.
"""
for securityModel in self.securityModels.values():
securityModel._close()

def openDispatcher(self, timeout: float = 0):
"""
Open the dispatcher used by SNMP engine.
Expand Down
7 changes: 6 additions & 1 deletion pysnmp/proto/mpmod/rfc3412.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
# License: https://www.pysnmp.com/pysnmp/license.html
#
from __future__ import annotations
import sys
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pysnmp.entity.engine import SnmpEngine

from pyasn1.codec.ber import decoder, eoo
from pyasn1.error import PyAsn1Error
Expand Down Expand Up @@ -348,7 +353,7 @@ def prepareOutgoingMessage(

def prepareResponseMessage(
self,
snmpEngine,
snmpEngine: SnmpEngine,
messageProcessingModel,
securityModel,
securityName,
Expand Down
11 changes: 11 additions & 0 deletions pysnmp/proto/secmod/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

class AbstractSecurityModel:
SECURITY_MODEL_ID = None
_cache: cache.Cache

def __init__(self):
self._cache = cache.Cache()
Expand Down Expand Up @@ -63,3 +64,13 @@ def releaseStateInformation(self, stateReference):

def receiveTimerTick(self, snmpEngine, timeNow):
pass

def _close(self):
"""
Close the security model to test memory leak.
This method is intended for unit testing purposes only.
It closes the security model and checks if all associated resources are released.
"""

raise error.ProtocolError("Security model %s not implemented" % self)
7 changes: 7 additions & 0 deletions pysnmp/proto/secmod/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
# License: https://www.pysnmp.com/pysnmp/license.html
#
from typing import Any, Dict


from pysnmp import nextid
from pysnmp.proto import error


class Cache:
__stateReference = nextid.Integer(0xFFFFFF)
__cacheEntries: Dict[int, Any]

def __init__(self):
self.__cacheEntries = {}
Expand All @@ -28,3 +32,6 @@ def pop(self, stateReference):
)
del self.__cacheEntries[stateReference]
return securityData

def isEmpty(self):
return not bool(self.__cacheEntries)
9 changes: 9 additions & 0 deletions pysnmp/proto/secmod/rfc2576.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ def __init__(self):
) = self.__communityBranchId = self.__securityBranchId = -1
base.AbstractSecurityModel.__init__(self)

def _close(self):
"""
Close the security model to test memory leak.
This method is intended for unit testing purposes only.
It closes the security model and checks if all associated resources are released.
"""
pass

def _sec2com(self, snmpEngine, securityName, contextEngineId, contextName):
(
snmpTargetParamsSecurityName,
Expand Down
10 changes: 10 additions & 0 deletions pysnmp/proto/secmod/rfc3414/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ def __init__(self):
self.__expirationTimer = 0
self.__paramsBranchId = -1

def _close(self):
"""
Close the security model to test memory leak.
This method is intended for unit testing purposes only.
It closes the security model and checks if all associated resources are released.
"""
if self._cache and not self._cache.isEmpty():
raise ValueError("Cache is not empty")

def __sec2usr(self, snmpEngine, securityName, securityEngineID=None):
mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
(usmUserEngineID,) = mibBuilder.importSymbols(
Expand Down
12 changes: 11 additions & 1 deletion tests/agent_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ async def start_agent(
snmpEngine, "usr-none-none", config.USM_AUTH_NONE, config.USM_PRIV_NONE
)

config.addV3User(snmpEngine, "usr-sha-none", config.USM_AUTH_HMAC96_SHA, "authkey1")

config.addV3User(
snmpEngine,
"usr-sha-aes",
Expand All @@ -51,14 +53,22 @@ async def start_agent(
"privkey1",
)

config.addV3User(snmpEngine, "usr-sha-none", config.USM_AUTH_HMAC96_SHA, "authkey1")
config.addV3User(
snmpEngine,
"usr-sha-aes256",
config.USM_AUTH_HMAC96_SHA,
"authkey1",
config.USM_PRIV_CFB256_AES,
"privkey1",
)

# Allow read MIB access for this user / securityModels at VACM
config.addVacmUser(snmpEngine, 1, "public", "noAuthNoPriv", (1, 3, 6), (1, 3, 6))
config.addVacmUser(snmpEngine, 2, "public", "noAuthNoPriv", (1, 3, 6), (1, 3, 6))
config.addVacmUser(snmpEngine, 3, "usr-none-none", "noAuthNoPriv", (1, 3, 6))
config.addVacmUser(snmpEngine, 3, "usr-sha-none", "authNoPriv", (1, 3, 6))
config.addVacmUser(snmpEngine, 3, "usr-sha-aes", "authPriv", (1, 3, 6))
config.addVacmUser(snmpEngine, 3, "usr-sha-aes256", "authPriv", (1, 3, 6))

# Configure SNMP context
snmpContext = context.SnmpContext(snmpEngine)
Expand Down
82 changes: 82 additions & 0 deletions tests/hlapi/asyncio/manager/cmdgen/test_usm_sha_aes256.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import pytest
from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp.proto.errind import DecryptionError, UnknownUserName, WrongDigest
from tests.agent_context import AGENT_PORT, AgentContextManager


@pytest.mark.asyncio
async def test_usm_sha_aes256():
async with AgentContextManager():
snmpEngine = SnmpEngine()
authData = UsmUserData(
"usr-sha-aes256",
"authkey1",
"privkey1",
authProtocol=USM_AUTH_HMAC96_SHA,
privProtocol=USM_PRIV_CFB256_AES,
)
errorIndication, errorStatus, errorIndex, varBinds = await getCmd(
snmpEngine,
authData,
await UdpTransportTarget.create(("localhost", AGENT_PORT), retries=0),
ContextData(),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)

assert errorIndication is None
assert errorStatus == 0
assert len(varBinds) == 1
assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysDescr.0"
isinstance(varBinds[0][1], OctetString)

snmpEngine.closeDispatcher()


@pytest.mark.asyncio
async def test_usm_sha_aes256_wrong_auth():
async with AgentContextManager():
snmpEngine = SnmpEngine()
authData = UsmUserData(
"usr-sha-aes",
"authkey1",
"privkey1",
authProtocol=USM_AUTH_HMAC96_MD5, # wrongly use usmHMACMD5AuthProtocol
privProtocol=USM_PRIV_CFB256_AES,
)
errorIndication, errorStatus, errorIndex, varBinds = await getCmd(
snmpEngine,
authData,
await UdpTransportTarget.create(("localhost", AGENT_PORT), retries=0),
ContextData(),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)

assert isinstance(errorIndication, WrongDigest)
assert str(errorIndication) == "Wrong SNMP PDU digest"

snmpEngine.closeDispatcher()


@pytest.mark.asyncio
async def test_usm_sha_aes256_wrong_user():
async with AgentContextManager():
snmpEngine = SnmpEngine()
authData = UsmUserData(
"usr-sha-aes-not-exist",
"authkey1",
"privkey1",
authProtocol=USM_AUTH_HMAC96_SHA,
privProtocol=USM_PRIV_CFB256_AES,
)
errorIndication, errorStatus, errorIndex, varBinds = await getCmd(
snmpEngine,
authData,
await UdpTransportTarget.create(("localhost", AGENT_PORT), retries=0),
ContextData(),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)

assert isinstance(errorIndication, UnknownUserName)
assert str(errorIndication) == "Unknown USM user"

snmpEngine.closeDispatcher()

0 comments on commit 96676f9

Please sign in to comment.