Place obvious bandit nosec comments

This commit is contained in:
Lee Miller 2022-04-29 18:05:48 +03:00
parent b8c2795b82
commit 93c283a467
Signed by untrusted user: lee.miller
GPG Key ID: 4F97A5EA88F4AB63
10 changed files with 24 additions and 22 deletions

View File

@ -62,9 +62,9 @@ import errno
import hashlib
import httplib
import json
import random # nosec
import random
import socket
import subprocess
import subprocess # nosec B404
import time
import xmlrpclib
from binascii import hexlify, unhexlify
@ -240,7 +240,7 @@ class singleAPI(StoppableThread):
if attempt > 0:
logger.warning(
'Failed to start API listener on port %s', port)
port = random.randint(32767, 65535)
port = random.randint(32767, 65535) # nosec B311
se = StoppableRPCServer(
(config.get(
'bitmessagesettings', 'apiinterface'),
@ -266,7 +266,7 @@ class singleAPI(StoppableThread):
if apiNotifyPath:
logger.info('Trying to call %s', apiNotifyPath)
try:
subprocess.call([apiNotifyPath, "startingUp"])
subprocess.call([apiNotifyPath, "startingUp"]) # nosec B603
except OSError:
logger.warning(
'Failed to call %s, removing apinotifypath setting',

View File

@ -7,7 +7,7 @@ processes the network objects
import hashlib
import logging
import random
import subprocess # nosec
import subprocess # nosec B404
import threading
import time
from binascii import hexlify
@ -458,7 +458,7 @@ class objectProcessor(threading.Thread):
for key, cryptorObject in sorted(
shared.myECCryptorObjects.items(),
key=lambda x: random.random()):
key=lambda x: random.random()): # nosec B311
try:
# continue decryption attempts to avoid timing attacks
if initialDecryptionSuccessful:
@ -680,7 +680,8 @@ class objectProcessor(threading.Thread):
apiNotifyPath = config.safeGet(
'bitmessagesettings', 'apinotifypath')
if apiNotifyPath:
subprocess.call([apiNotifyPath, "newMessage"])
subprocess.call( # nosec B603
[apiNotifyPath, "newMessage"])
# Let us now check and see whether our receiving address is
# behaving as a mailing list
@ -776,7 +777,7 @@ class objectProcessor(threading.Thread):
initialDecryptionSuccessful = False
for key, cryptorObject in sorted(
shared.MyECSubscriptionCryptorObjects.items(),
key=lambda x: random.random()):
key=lambda x: random.random()): # nosec B311
try:
# continue decryption attempts to avoid timing attacks
if initialDecryptionSuccessful:
@ -964,7 +965,7 @@ class objectProcessor(threading.Thread):
apiNotifyPath = config.safeGet(
'bitmessagesettings', 'apinotifypath')
if apiNotifyPath:
subprocess.call([apiNotifyPath, "newBroadcast"])
subprocess.call([apiNotifyPath, "newBroadcast"]) # nosec B603
# Display timing data
logger.info(

View File

@ -17,7 +17,7 @@ if not hasattr(sys, 'hexversion') or sys.hexversion < 0x20300F0:
)
import logging # noqa:E402
import subprocess
import subprocess # nosec B404
from importlib import import_module

View File

@ -3,7 +3,7 @@ Select which node to connect to
"""
# pylint: disable=too-many-branches
import logging
import random # nosec
import random
import knownnodes
import protocol
@ -17,7 +17,7 @@ logger = logging.getLogger('default')
def getDiscoveredPeer():
"""Get a peer from the local peer discovery list"""
try:
peer = random.choice(state.discoveredPeers.keys())
peer = random.choice(state.discoveredPeers.keys()) # nosec B311
except (IndexError, KeyError):
raise ValueError
try:
@ -40,11 +40,12 @@ def chooseConnection(stream):
except queue.Empty:
pass
# with a probability of 0.5, connect to a discovered peer
if random.choice((False, True)) and not haveOnion:
if random.choice((False, True)) and not haveOnion: # nosec B311
# discovered peers are already filtered by allowed streams
return getDiscoveredPeer()
for _ in range(50):
peer = random.choice(knownnodes.knownNodes[stream].keys())
peer = random.choice( # nosec B311
knownnodes.knownNodes[stream].keys())
try:
peer_info = knownnodes.knownNodes[stream][peer]
if peer_info.get('self'):
@ -70,7 +71,7 @@ def chooseConnection(stream):
if rating > 1:
rating = 1
try:
if 0.05 / (1.0 - rating) > random.random():
if 0.05 / (1.0 - rating) > random.random(): # nosec B311
return peer
except ZeroDivisionError:
return peer

View File

@ -140,7 +140,7 @@ class Dandelion: # pylint: disable=old-style-class
"""
try:
# pick a random from available stems
stem = choice(range(len(self.stem)))
stem = choice(range(len(self.stem))) # nosec B311
if self.stem[stem] == parent:
# one stem available and it's the parent
if len(self.stem) == 1:

View File

@ -7,7 +7,7 @@ Manipulations with knownNodes dictionary.
import json
import logging
import os
import pickle
import pickle # nosec B403
import threading
import time
try:

View File

@ -398,7 +398,7 @@ class TCPServer(AdvancedDispatcher):
try:
if attempt > 0:
logger.warning('Failed to bind on port %s', port)
port = random.randint(32767, 65535)
port = random.randint(32767, 65535) # nosec B311
self.bind((host, port))
except socket.error as e:
if e.errno in (asyncore.EADDRINUSE, asyncore.WSAEADDRINUSE):

View File

@ -13,7 +13,7 @@ Configure tor proxy and hidden service with
"""
import logging
import os
import random # noseq
import random
import sys
import tempfile
@ -79,7 +79,7 @@ def connect_plugin(config):
port = config.safeGetInt('bitmessagesettings', 'socksport', 9050)
for attempt in range(50):
if attempt > 0:
port = random.randint(32767, 65535)
port = random.randint(32767, 65535) # nosec B311
tor_config['SocksPort'] = str(port)
if tor_config.get('DataDirectory'):
control_port = port + 1

View File

@ -56,7 +56,7 @@ OBJECT_I2P = 0x493250
OBJECT_ADDR = 0x61646472
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
'>Q', random.randrange(1, 18446744073709551615))
'>Q', random.randrange(1, 18446744073709551615)) # nosec B311
# Compiled struct for packing/unpacking headers
# New code should use CreatePacket instead of Header.pack

View File

@ -328,7 +328,7 @@ class uPnPThread(StoppableThread):
elif i == 1 and self.extPort:
extPort = self.extPort # try external port from last time next
else:
extPort = randint(32767, 65535)
extPort = randint(32767, 65535) # nosec B311
logger.debug(
"Attempt %i, requesting UPnP mapping for %s:%i on external port %i",
i,