Add new API commands for managing black/white list #2275

Merged
PeterSurda merged 2 commits from gitea-125 into v0.6 2025-01-15 15:45:08 +01:00
2 changed files with 138 additions and 1 deletions

View File

@ -157,7 +157,8 @@ class ErrorCodes(type):
25: 'Specified address is not a chan address.' 25: 'Specified address is not a chan address.'
' Use deleteAddress API call instead.', ' Use deleteAddress API call instead.',
26: 'Malformed varint in address: ', 26: 'Malformed varint in address: ',
27: 'Message is too long.' 27: 'Message is too long.',
28: 'Invalid parameter'
} }
def __new__(mcs, name, bases, namespace): def __new__(mcs, name, bases, namespace):
@ -558,6 +559,39 @@ class BMRPCDispatcher(object):
'ackData': hexlify(ackdata) 'ackData': hexlify(ackdata)
} }
@staticmethod
def _blackwhitelist_entries(kind='black'):
queryreturn = sqlQuery(
"SELECT label, address FROM %slist WHERE enabled = 1" % kind
)
data = [
{'label': base64.b64encode(
shared.fixPotentiallyInvalidUTF8Data(label)),
'address': address} for label, address in queryreturn
]
return {'addresses': data}
def _blackwhitelist_add(self, address, label, kind='black'):
label = self._decode(label, "base64")
address = addBMIfNotPresent(address)
self._verifyAddress(address)
queryreturn = sqlQuery(
"SELECT address FROM %slist WHERE address=?" % kind, address)
if queryreturn != []:
sqlExecute(
"UPDATE %slist SET label=?, enabled=1 WHERE address=?" % kind,
address)
else:
sqlExecute(
"INSERT INTO %slist VALUES (?,?,1)" % kind, label, address)
queues.UISignalQueue.put(('rerenderBlackWhiteList', ''))
def _blackwhitelist_del(self, address, kind='black'):
address = addBMIfNotPresent(address)
self._verifyAddress(address)
sqlExecute("DELETE FROM %slist WHERE address=?" % kind, address)
queues.UISignalQueue.put(('rerenderBlackWhiteList', ''))
# Request Handlers # Request Handlers
@command('decodeAddress') @command('decodeAddress')
@ -641,6 +675,61 @@ class BMRPCDispatcher(object):
queues.UISignalQueue.put(('rerenderAddressBook', '')) queues.UISignalQueue.put(('rerenderAddressBook', ''))
return "Deleted address book entry for %s if it existed" % address return "Deleted address book entry for %s if it existed" % address
@command('getBlackWhitelistKind')
def HandleGetBlackWhitelistKind(self):
"""Get the list kind set in config - black or white."""
return self.config.get('bitmessagesettings', 'blackwhitelist')
@command('setBlackWhitelistKind')
def HandleSetBlackWhitelistKind(self, kind):
"""Set the list kind used - black or white."""
blackwhitelist_kinds = ('black', 'white')
if kind not in blackwhitelist_kinds:
raise APIError(
28, 'Invalid kind, should be one of %s'
% (blackwhitelist_kinds,))
return self.config.set('bitmessagesettings', 'blackwhitelist', kind)
@command('listBlacklistEntries')
def HandleListBlacklistEntries(self):
"""
Returns dict with a list of all blacklist entries (address and label)
in the *addresses* key.
"""
return self._blackwhitelist_entries('black')
@command('listWhitelistEntries')
def HandleListWhitelistEntries(self):
"""
Returns dict with a list of all whitelist entries (address and label)
in the *addresses* key.
"""
return self._blackwhitelist_entries('white')
@command('addBlacklistEntry')
def HandleAddBlacklistEntry(self, address, label):
"""Add an entry to blacklist. label must be base64 encoded."""
self._blackwhitelist_add(address, label, 'black')
return "Added address %s to blacklist" % address
@command('addWhitelistEntry')
def HandleAddWhitelistEntry(self, address, label):
"""Add an entry to whitelist. label must be base64 encoded."""
self._blackwhitelist_add(address, label, 'white')
return "Added address %s to whitelist" % address
@command('deleteBlacklistEntry')
def HandleDeleteBlacklistEntry(self, address):
"""Delete an entry from blacklist."""
self._blackwhitelist_del(address, 'black')
return "Deleted blacklist entry for %s if it existed" % address
@command('deleteWhitelistEntry')
def HandleDeleteWhitelistEntry(self, address):
"""Delete an entry from whitelist."""
self._blackwhitelist_del(address, 'white')
return "Deleted whitelist entry for %s if it existed" % address
@command('createRandomAddress') @command('createRandomAddress')
def HandleCreateRandomAddress( def HandleCreateRandomAddress(
self, label, eighteenByteRipe=False, totalDifficulty=0, self, label, eighteenByteRipe=False, totalDifficulty=0,

View File

@ -301,6 +301,54 @@ class TestAPI(TestAPIProto):
self.assertEqual( self.assertEqual(
json.loads(self.api.listSubscriptions())['subscriptions'], []) json.loads(self.api.listSubscriptions())['subscriptions'], [])
def test_blackwhitelist(self):
"""Test API commands for managing the black/white list"""
# Initially it's black
self.assertEqual(self.api.getBlackWhitelistKind(), 'black')
# Initially they are empty
self.assertEqual(
json.loads(self.api.listBlacklistEntries()).get('addresses'), [])
self.assertEqual(
json.loads(self.api.listWhitelistEntries()).get('addresses'), [])
# For the Blacklist:
# Add known address
self.api.addBlacklistEntry(
sample_deterministic_addr4,
base64.encodestring('tiger_4')
)
# Check list entry
entry = json.loads(self.api.listBlacklistEntries()).get('addresses')[0]
self.assertEqual(entry['address'], sample_deterministic_addr4)
self.assertEqual(base64.decodestring(entry['label']), 'tiger_4')
# Remove known address
self.api.deleteBlacklistEntry(sample_deterministic_addr4)
self.assertEqual(
json.loads(self.api.listBlacklistEntries()).get('addresses'), [])
# Only two kinds - black and white
six.assertRegex(
self, self.api.setBlackWhitelistKind('yellow'),
r'^API Error 0028:')
# Change kind
self.api.setBlackWhitelistKind('white')
self.assertEqual(self.api.getBlackWhitelistKind(), 'white')
# For the Whitelist:
# Add known address
self.api.addWhitelistEntry(
sample_deterministic_addr4,
base64.encodestring('tiger_4')
)
# Check list entry
entry = json.loads(self.api.listWhitelistEntries()).get('addresses')[0]
self.assertEqual(entry['address'], sample_deterministic_addr4)
self.assertEqual(base64.decodestring(entry['label']), 'tiger_4')
# Remove known address
self.api.deleteWhitelistEntry(sample_deterministic_addr4)
self.assertEqual(
json.loads(self.api.listWhitelistEntries()).get('addresses'), [])
def test_send(self): def test_send(self):
"""Test message sending""" """Test message sending"""
addr = self._add_random_address('random_2') addr = self._add_random_address('random_2')