@ -19,12 +19,9 @@ import network.connectionpool
from network . downloadqueue import DownloadQueue
from network . node import Node
import network . asyncore_pollchoose as asyncore
from network . objectracker import ObjectTracker
from network . proxy import Proxy , ProxyError , GeneralProxyError
from network . bmqueues import BMQueues
from network . socks5 import Socks5Connection , Socks5Resolver , Socks5AuthError , Socks5Error
from network . socks4a import Socks4aConnection , Socks4aResolver , Socks4aError
from network . uploadqueue import UploadQueue , UploadElem , AddrUploadQueue , ObjUploadQueue
from network . tls import TLSDispatcher
import addresses
from bmconfigparser import BMConfigParser
@ -42,7 +39,7 @@ class BMProtoInsufficientDataError(BMProtoError): pass
class BMProtoExcessiveDataError ( BMProtoError ) : pass
class BM Connection( TLSDispatcher , BMQueues ) :
class BM Proto( AdvancedDispatcher , ObjectTracker ) :
# ~1.6 MB which is the maximum possible size of an inv message.
maxMessageSize = 1600100
# 2**18 = 256kB is the maximum size of an object payload
@ -51,36 +48,40 @@ class BMConnection(TLSDispatcher, BMQueues):
maxAddrCount = 1000
# protocol specification says max 50000 objects in one inv command
maxObjectCount = 50000
def __init__ ( self , address = None , sock = None ) :
AdvancedDispatcher . __init__ ( self , sock )
self . verackReceived = False
self . verackSent = False
self . lastTx = time . time ( )
self . streams = [ 0 ]
self . fullyEstablished = False
self . connectedAt = 0
self . skipUntil = 0
if address is None and sock is not None :
self . destination = state . Peer ( sock . getpeername ( ) [ 0 ] , sock . getpeername ( ) [ 1 ] )
self . isOutbound = False
TLSDispatcher . __init__ ( self , sock , server_side = True )
self . connectedAt = time . time ( )
print " received connection in background from %s : %i " % ( self . destination . host , self . destination . port )
else :
self . destination = address
self . isOutbound = True
if " : " in address . host :
self . create_socket ( socket . AF_INET6 , socket . SOCK_STREAM )
else :
self . create_socket ( socket . AF_INET , socket . SOCK_STREAM )
self . socket . setsockopt ( socket . SOL_SOCKET , socket . SO_REUSEADDR , 1 )
TLSDispatcher . __init__ ( self , sock , server_side = False )
self . connect ( self . destination )
print " connecting in background to %s : %i " % ( self . destination . host , self . destination . port )
shared . connectedHostsList [ self . destination ] = 0
BMQueues . __init__ ( self )
UISignalQueue . put ( ( ' updateNetworkStatusTab ' , ' no data ' ) )
# address is online if online less than this many seconds ago
addressAlive = 10800
# maximum time offset
maxTimeOffset = 3600
# def __init__(self, address=None, sock=None):
# AdvancedDispatcher.__init__(self, sock)
# self.verackReceived = False
# self.verackSent = False
# self.lastTx = time.time()
# self.streams = [0]
# self.fullyEstablished = False
# self.connectedAt = 0
# self.skipUntil = 0
# if address is None and sock is not None:
# self.destination = state.Peer(sock.getpeername()[0], sock.getpeername()[1])
# self.isOutbound = False
# TLSDispatcher.__init__(self, sock, server_side=True)
# self.connectedAt = time.time()
# #print "received connection in background from %s:%i" % (self.destination.host, self.destination.port)
# else:
# self.destination = address
# self.isOutbound = True
# if ":" in address.host:
# self.create_socket(socket.AF_INET6, socket.SOCK_STREAM)
# else:
# self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
# self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# TLSDispatcher.__init__(self, sock, server_side=False)
# self.connect(self.destination)
# #print "connecting in background to %s:%i" % (self.destination.host, self.destination.port)
# shared.connectedHostsList[self.destination] = 0
# ObjectTracker.__init__(self)
# UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
def bm_proto_reset ( self ) :
self . magic = None
@ -92,37 +93,6 @@ class BMConnection(TLSDispatcher, BMQueues):
self . payloadOffset = 0
self . object = None
def state_init ( self ) :
self . bm_proto_reset ( )
if self . isOutbound :
self . writeQueue . put ( protocol . assembleVersionMessage ( self . destination . host , self . destination . port , network . connectionpool . BMConnectionPool ( ) . streams , False ) )
print " %s : %i : Sending version " % ( self . destination . host , self . destination . port )
self . set_state ( " bm_header " )
return True
def antiIntersectionDelay ( self , initial = False ) :
# estimated time for a small object to propagate across the whole network
delay = math . ceil ( math . log ( max ( len ( knownnodes . knownNodes [ x ] ) for x in knownnodes . knownNodes ) + 2 , 20 ) ) * ( 0.2 + UploadQueue . queueCount / 2 )
# take the stream with maximum amount of nodes
# +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count
# 0.2 is avg message transmission time
if delay > 0 :
if initial :
self . skipUntil = self . connectedAt + delay
if self . skipUntil > time . time ( ) :
logger . debug ( " Skipping processing for %.2f s " , self . skipUntil - time . time ( ) )
else :
logger . debug ( " Skipping processing due to missing object for %.2f s " , self . skipUntil - time . time ( ) )
self . skipUntil = time . time ( ) + delay
def set_connection_fully_established ( self ) :
UISignalQueue . put ( ( ' updateNetworkStatusTab ' , ' no data ' ) )
self . antiIntersectionDelay ( True )
self . fullyEstablished = True
self . sendAddr ( )
self . sendBigInv ( )
def state_bm_header ( self ) :
#print "%s:%i: header" % (self.destination.host, self.destination.port)
if len ( self . read_buf ) < protocol . Header . size :
@ -137,7 +107,7 @@ class BMConnection(TLSDispatcher, BMQueues):
print " Bad magic "
self . close ( )
return False
if self . payloadLength > BM Connection . maxMessageSize :
if self . payloadLength > BM Proto . maxMessageSize :
self . invalid = True
self . set_state ( " bm_command " , protocol . Header . size )
return True
@ -309,28 +279,18 @@ class BMConnection(TLSDispatcher, BMQueues):
def bm_command_inv ( self ) :
items = self . decode_payload_content ( " L32s " )
if len ( items ) > = BM Connection . maxObjectCount :
if len ( items ) > = BM Proto . maxObjectCount :
logger . error ( " Too many items in inv message! " )
raise BMProtoExcessiveDataError ( )
else :
pass
#print "items in inv: %i" % (len(items))
startTime = time . time ( )
#advertisedSet = set()
for i in items :
#advertisedSet.add(i)
self . handleReceivedObj ( i )
#objectsNewToMe = advertisedSet
#for stream in self.streams:
#objectsNewToMe -= Inventory().hashes_by_stream(stream)
logger . info ( ' inv message lists %i objects. Of those %i are new to me. It took %f seconds to figure that out. ' , len ( items ) , len ( self . objectsNewToMe ) , time . time ( ) - startTime )
self . receiveQueue . put ( ( " inv " , i ) )
self . handleReceivedInventory ( i )
payload = addresses . encodeVarint ( len ( self . objectsNewToMe ) ) + ' ' . join ( self . objectsNewToMe . keys ( ) )
self . writeQueue . put ( protocol . CreatePacket ( ' getdata ' , payload ) )
# for i in random.sample(self.objectsNewToMe, len(self.objectsNewToMe)):
# DownloadQueue().put(i)
return True
def bm_command_object ( self ) :
@ -338,7 +298,7 @@ class BMConnection(TLSDispatcher, BMQueues):
nonce , expiresTime , objectType , version , streamNumber = self . decode_payload_content ( " QQIvv " )
self . object = BMObject ( nonce , expiresTime , objectType , version , streamNumber , self . payload )
if len ( self . payload ) - self . payloadOffset > BM Connection . maxObjectPayloadSize :
if len ( self . payload ) - self . payloadOffset > BM Proto . maxObjectPayloadSize :
logger . info ( ' The payload length of this object is too large ( %s bytes). Ignoring it. ' % len ( self . payload ) - self . payloadOffset )
raise BMProtoExcessiveDataError ( )
@ -368,20 +328,23 @@ class BMConnection(TLSDispatcher, BMQueues):
#broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
return True
def _decode_addr ( self ) :
return self . decode_payload_content ( " lQIQ16sH " )
def bm_command_addr ( self ) :
addresses = self . decode_payload_content ( " lQIQ16sH " )
import pprint
addresses = self . _decode_addr ( )
for i in addresses :
seenTime , stream , services , ip , port = i
decodedIP = protocol . checkIPAddress ( ip )
if stream not in state . streamsInWhichIAmParticipating :
continue
#print "maybe adding %s in stream %i to knownnodes (%i)" % (decodedIP, stream, len(knownnodes.knownNodes[stream]))
if decodedIP is not False and seenTime > time . time ( ) - 10800 :
if decodedIP is not False and seenTime > time . time ( ) - BMProto . addressAlive :
peer = state . Peer ( decodedIP , port )
if peer in knownnodes . knownNodes [ stream ] and knownnodes . knownNodes [ stream ] [ peer ] > seenTime :
continue
knownnodes . knownNodes [ stream ] [ peer ] = seenTime
AddrUploadQueue ( ) . put ( ( stream , peer ) )
return True
def bm_command_portcheck ( self ) :
@ -411,14 +374,15 @@ class BMConnection(TLSDispatcher, BMQueues):
def bm_command_version ( self ) :
#self.remoteProtocolVersion, self.services, self.timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:protocol.VersionPacket.size])
self . remoteProtocolVersion , self . services , self . timestamp , self . sockNode , self . peerNode , self . nonce , self . userAgent , self . streams = self . decode_payload_content ( " IQQiiQlslv " )
self . nonce = struct . pack ( ' >Q ' , self . nonce )
self . timeOffset = self . timestamp - int ( time . time ( ) )
print " remoteProtocolVersion: %i " % ( self . remoteProtocolVersion )
print " services: %08X " % ( self . services )
print " time offset: %i " % ( self . timestamp - int ( time . time ( ) ) )
print " my external IP: %s " % ( self . sockNode . host )
print " remote node incoming port: %i " % ( self . peerNode . port )
print " user agent: %s " % ( self . userAgent )
print " streams: [ %s ] " % ( " , " . join ( map ( str , self . streams ) ) )
#print "remoteProtocolVersion: %i" % (self.remoteProtocolVersion )
#print "services: %08X" % (self.services )
#print "time offset: %i" % (self.timestamp - int(time.time()) )
#print "my external IP: %s" % (self.sockNode.host )
#print "remote node incoming port: %i" % (self.peerNode.port )
#print "user agent: %s" % (self.userAgent )
#print "streams: [%s]" % (",".join(map(str,self.streams)) )
if not self . peerValidityChecks ( ) :
# TODO ABORT
return True
@ -446,20 +410,20 @@ class BMConnection(TLSDispatcher, BMQueues):
self . writeQueue . put ( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your is using an old protocol. Closing connection. " ) )
logger . debug ( ' Closing connection to old protocol version %s , node: %s ' ,
str ( self . remoteProtocolVersion ) , str ( self . peer ) )
str ( self . remoteProtocolVersion ) , str ( self . destination ) )
return False
if self . timeOffset > 3600 :
if self . timeOffset > BMProto . maxTimeOffset :
self . writeQueue . put ( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your time is too far in the future compared to mine. Closing connection. " ) )
logger . info ( " %s ' s time is too far in the future ( %s seconds). Closing connection to it. " ,
self . peer , self . timeOffset )
self . destination , self . timeOffset )
shared . timeOffsetWrongCount + = 1
return False
elif self . timeOffset < - 3600 :
elif self . timeOffset < - BMProto . maxTimeOffset :
self . writeQueue . put ( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your time is too far in the past compared to mine. Closing connection. " ) )
logger . info ( " %s ' s time is too far in the past (timeOffset %s seconds). Closing connection to it. " ,
self . peer , self . timeOffset )
self . destination , self . timeOffset )
shared . timeOffsetWrongCount + = 1
return False
else :
@ -468,7 +432,7 @@ class BMConnection(TLSDispatcher, BMQueues):
self . writeQueue . put ( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " We don ' t have shared stream interests. Closing connection. " ) )
logger . debug ( ' Closed connection to %s because there is no overlapping interest in streams. ' ,
str ( self . peer ) )
str ( self . destination ) )
return False
if self . destination in network . connectionpool . BMConnectionPool ( ) . inboundConnections :
try :
@ -476,218 +440,63 @@ class BMConnection(TLSDispatcher, BMQueues):
self . writeQueue . put ( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Too many connections from your IP. Closing connection. " ) )
logger . debug ( ' Closed connection to %s because we are already connected to that IP. ' ,
str ( self . peer ) )
str ( self . destination ) )
return False
except :
pass
if self . nonce == protocol . eightBytesOfRandomDataUsedToDetectConnectionsToSelf :
self . writeQueue . put ( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " I ' m connected to myself. Closing connection. " ) )
logger . debug ( " Closed connection to %s because I ' m connected to myself. " ,
str ( self . destination ) )
return True
def sendAddr ( self ) :
def sendChunk ( ) :
if addressCount == 0 :
return
self . writeQueue . put ( protocol . CreatePacket ( ' addr ' , \
addresses . encodeVarint ( addressCount ) + payload ) )
# We are going to share a maximum number of 1000 addrs (per overlapping
# stream) with our peer. 500 from overlapping streams, 250 from the
# left child stream, and 250 from the right child stream.
maxAddrCount = BMConfigParser ( ) . safeGetInt ( " bitmessagesettings " , " maxaddrperstreamsend " , 500 )
# init
addressCount = 0
payload = b ' '
for stream in self . streams :
addrsInMyStream = { }
addrsInChildStreamLeft = { }
addrsInChildStreamRight = { }
with knownnodes . knownNodesLock :
if len ( knownnodes . knownNodes [ stream ] ) > 0 :
filtered = { k : v for k , v in knownnodes . knownNodes [ stream ] . items ( )
if v > ( int ( time . time ( ) ) - shared . maximumAgeOfNodesThatIAdvertiseToOthers ) }
elemCount = len ( filtered )
if elemCount > maxAddrCount :
elemCount = maxAddrCount
# only if more recent than 3 hours
addrsInMyStream = random . sample ( filtered . items ( ) , elemCount )
# sent 250 only if the remote isn't interested in it
if len ( knownnodes . knownNodes [ stream * 2 ] ) > 0 and stream not in self . streams :
filtered = { k : v for k , v in knownnodes . knownNodes [ stream * 2 ] . items ( )
if v > ( int ( time . time ( ) ) - shared . maximumAgeOfNodesThatIAdvertiseToOthers ) }
elemCount = len ( filtered )
if elemCount > maxAddrCount / 2 :
elemCount = int ( maxAddrCount / 2 )
addrsInChildStreamLeft = random . sample ( filtered . items ( ) , elemCount )
if len ( knownnodes . knownNodes [ ( stream * 2 ) + 1 ] ) > 0 and stream not in self . streams :
filtered = { k : v for k , v in knownnodes . knownNodes [ stream * 2 + 1 ] . items ( )
if v > ( int ( time . time ( ) ) - shared . maximumAgeOfNodesThatIAdvertiseToOthers ) }
elemCount = len ( filtered )
if elemCount > maxAddrCount / 2 :
elemCount = int ( maxAddrCount / 2 )
addrsInChildStreamRight = random . sample ( filtered . items ( ) , elemCount )
for ( HOST , PORT ) , timeLastReceivedMessageFromThisNode in addrsInMyStream :
addressCount + = 1
payload + = struct . pack (
' >Q ' , timeLastReceivedMessageFromThisNode ) # 64-bit time
payload + = struct . pack ( ' >I ' , stream )
payload + = struct . pack (
' >q ' , 1 ) # service bit flags offered by this node
payload + = protocol . encodeHost ( HOST )
payload + = struct . pack ( ' >H ' , PORT ) # remote port
if addressCount > = BMConnection . maxAddrCount :
sendChunk ( )
payload = b ' '
addressCount = 0
for ( HOST , PORT ) , timeLastReceivedMessageFromThisNode in addrsInChildStreamLeft :
addressCount + = 1
payload + = struct . pack (
' >Q ' , timeLastReceivedMessageFromThisNode ) # 64-bit time
payload + = struct . pack ( ' >I ' , stream * 2 )
payload + = struct . pack (
' >q ' , 1 ) # service bit flags offered by this node
payload + = protocol . encodeHost ( HOST )
payload + = struct . pack ( ' >H ' , PORT ) # remote port
if addressCount > = BMConnection . maxAddrCount :
sendChunk ( )
payload = b ' '
addressCount = 0
for ( HOST , PORT ) , timeLastReceivedMessageFromThisNode in addrsInChildStreamRight :
addressCount + = 1
payload + = struct . pack (
' >Q ' , timeLastReceivedMessageFromThisNode ) # 64-bit time
payload + = struct . pack ( ' >I ' , ( stream * 2 ) + 1 )
payload + = struct . pack (
' >q ' , 1 ) # service bit flags offered by this node
payload + = protocol . encodeHost ( HOST )
payload + = struct . pack ( ' >H ' , PORT ) # remote port
if addressCount > = BMConnection . maxAddrCount :
sendChunk ( )
payload = b ' '
addressCount = 0
# flush
sendChunk ( )
def sendBigInv ( self ) :
def sendChunk ( ) :
if objectCount == 0 :
return
logger . debug ( ' Sending huge inv message with %i objects to just this one peer ' , objectCount )
self . writeQueue . put ( protocol . CreatePacket ( ' inv ' , addresses . encodeVarint ( objectCount ) + payload ) )
# Select all hashes for objects in this stream.
bigInvList = { }
for stream in self . streams :
for hash in Inventory ( ) . unexpired_hashes_by_stream ( stream ) :
bigInvList [ hash ] = 0
# for hash in ObjUploadQueue().streamHashes(stream):
# try:
# del bigInvList[hash]
# except KeyError:
# pass
objectCount = 0
payload = b ' '
# Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer.
for hash , storedValue in bigInvList . items ( ) :
payload + = hash
objectCount + = 1
if objectCount > = BMConnection . maxObjectCount :
self . sendChunk ( )
payload = b ' '
objectCount = 0
# flush
sendChunk ( )
@staticmethod
def assembleAddr ( peerList ) :
if type ( peerList ) is state . Peer :
peerList = ( peerList )
# TODO handle max length, now it's done by upper layers
payload = addresses . encodeVarint ( len ( peerList ) )
for address in peerList :
stream , peer , timestamp = address
payload + = struct . pack (
' >Q ' , timestamp ) # 64-bit time
payload + = struct . pack ( ' >I ' , stream )
payload + = struct . pack (
' >q ' , 1 ) # service bit flags offered by this node
payload + = protocol . encodeHost ( peer . host )
payload + = struct . pack ( ' >H ' , peer . port ) # remote port
return protocol . CreatePacket ( ' addr ' , payload )
def handle_connect_event ( self ) :
try :
asyncore . dispatcher . handle_connect_event ( self )
self . connectedAt = time . time ( )
except socket . error as e :
print " %s : %i : socket error: %s " % ( self . destination . host , self . destination . port , str ( e ) )
#print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e))
self . close ( )
def handle_read_event ( self ) :
try :
asyncore . dispatcher . handle_read_event ( self )
except socket . error as e :
print " %s : %i : socket error: %s " % ( self . destination . host , self . destination . port , str ( e ) )
#print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e))
self . close ( )
def handle_write_event ( self ) :
try :
asyncore . dispatcher . handle_write_event ( self )
except socket . error as e :
print " %s : %i : socket error: %s " % ( self . destination . host , self . destination . port , str ( e ) )
#print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e))
self . close ( )
def close ( self , reason = None ) :
self . set_state ( " close " )
if reason is None :
print " %s : %i : closing " % ( self . destination . host , self . destination . port )
#traceback.print_stack()
else :
print " %s : %i : closing, %s " % ( self . destination . host , self . destination . port , reason )
# if reason is None:
# print "%s:%i: closing" % (self.destination.host, self.destination.port)
# #traceback.print_stack()
# else:
# print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason)
network . connectionpool . BMConnectionPool ( ) . removeConnection ( self )
asyncore . dispatcher . close ( self )
class Socks5BMConnection ( Socks5Connection , BMConnection ) :
def __init__ ( self , address ) :
Socks5Connection . __init__ ( self , address = address )
def state_socks_handshake_done ( self ) :
BMConnection . state_init ( self )
return False
class Socks4aBMConnection ( Socks4aConnection , BMConnection ) :
def __init__ ( self , address ) :
Socks4aConnection . __init__ ( self , address = address )
def state_socks_handshake_done ( self ) :
BMConnection . state_init ( self )
return False
class BMServer ( AdvancedDispatcher ) :
def __init__ ( self , host = ' 127.0.0.1 ' , port = 8444 ) :
if not hasattr ( self , ' _map ' ) :
AdvancedDispatcher . __init__ ( self )
self . create_socket ( socket . AF_INET , socket . SOCK_STREAM )
self . set_reuse_addr ( )
self . bind ( ( host , port ) )
self . listen ( 5 )
def handle_accept ( self ) :
pair = self . accept ( )
if pair is not None :
sock , addr = pair
try :
network . connectionpool . BMConnectionPool ( ) . addConnection ( BMConnection ( sock = sock ) )
except socket . errno :
pass
if __name__ == " __main__ " :
# initial fill
for host in ( ( " 127.0.0.1 " , 8448 ) , ) :
direct = BMConnection ( host )
while len ( asyncore . socket_map ) > 0 :
print " loop, state = %s " % ( direct . state )
asyncore . loop ( timeout = 10 , count = 1 )
continue
proxy = Socks5BMConnection ( host )
while len ( asyncore . socket_map ) > 0 :
# print "loop, state = %s" % (proxy.state)
asyncore . loop ( timeout = 10 , count = 1 )
proxy = Socks4aBMConnection ( host )
while len ( asyncore . socket_map ) > 0 :
# print "loop, state = %s" % (proxy.state)
asyncore . loop ( timeout = 10 , count = 1 )
AdvancedDispatcher . close ( self )