@ -1,15 +1,20 @@
import hashlib
import time
from pprint import pprint
import socket
from struct import unpack
from network . advanceddispatcher import AdvancedDispatcher
from network . node import Node
import network . asyncore_pollchoose as asyncore
from network . proxy import Proxy , ProxyError , GeneralProxyError
from network . socks5 import Socks5Connection , Socks5Resolver , Socks5AuthError , Socks5Error
from network . socks4a import Socks4aConnection , Socks4aResolver , Socks4aError
from network . tls import TLSDispatcher
import addresses
from bmconfigparser import BMConfigParser
import shared
import protocol
class BMProtoError ( ProxyError ) : pass
@ -30,14 +35,14 @@ class BMConnection(TLSDispatcher):
if address is None and sock is not None :
self . destination = self . addr ( )
self . isOutbound = False
TLS Handshake . __init__ ( self , sock , server_side = True )
TLS Dispatcher . __init__ ( self , sock , server_side = True )
print " received connection in background from %s : %i " % ( self . destination [ 0 ] , self . destination [ 1 ] )
else :
self . destination = address
self . isOutbound = True
self . create_socket ( socket . AF_INET , socket . SOCK_STREAM )
self . connect ( self . destination )
TLS Handshake . __init__ ( self , sock , server_side = False )
TLS Dispatcher . __init__ ( self , sock , server_side = False )
print " connecting in background to %s : %i " % ( self . destination [ 0 ] , self . destination [ 1 ] )
def bm_proto_reset ( self ) :
@ -47,19 +52,22 @@ class BMConnection(TLSDispatcher):
self . checksum = None
self . payload = None
self . invalid = False
self . payloadOffset = 0
def state_init ( self ) :
self . bm_proto_reset ( )
self . write_buf + = protocol . assembleVersionMessage ( self . destination [ 0 ] , self . destination [ 1 ] , ( 1 , ) , False )
self . append_write_buf( protocol . assembleVersionMessage ( self . destination [ 0 ] , self . destination [ 1 ] , ( 1 , ) , False ) )
if True :
print " Sending version ( %i b) " % len ( self . write_buf )
self . set_state ( " bm_header " , 0 )
self . set_state ( " bm_header " )
return False
def state_bm_ready ( self ) :
print " doing bm ready "
self . sendAddr ( )
self . sendBigInv ( )
return True
self . set_state ( " bm_header " )
return False
def state_bm_header ( self ) :
if len ( self . read_buf ) < protocol . Header . size :
@ -101,32 +109,127 @@ class BMConnection(TLSDispatcher):
# else assume the command requires a different state to follow
return True
def decode_payload_string ( self , length ) :
value = self . payload [ self . payloadOffset : self . payloadOffset + length ]
self . payloadOffset + = length
return value
def decode_payload_varint ( self ) :
value , offset = addresses . decodeVarint ( self . payload [ self . payloadOffset : ] )
self . payloadOffset + = offset
return value
def decode_payload_node ( self ) :
services , address , port = self . decode_payload_content ( " Q16sH " )
return Node ( services , address , port )
def decode_payload_content ( self , pattern = " v " ) :
# l = varint indicating the length of the next item
# v = varint (or array)
# H = uint16
# I = uint32
# Q = uint64
# i = net_addr (without time and stream number)
# s = string
# 0-9 = length of the next item
# , = end of array
retval = [ ]
size = 0
insideDigit = False
for i in range ( len ( pattern ) ) :
if pattern [ i ] in " 0123456789 " :
size = size * 10 + int ( pattern [ i ] )
continue
elif pattern [ i ] == " l " :
size = self . decode_payload_varint ( )
continue
if size > 0 :
innerval = [ ]
if pattern [ i ] == " s " :
retval . append ( self . payload [ self . payloadOffset : self . payloadOffset + size ] )
self . payloadOffset + = size
else :
for j in range ( size ) :
if " , " in pattern [ i : ] :
retval . append ( self . decode_payload_content ( pattern [ i : pattern . index ( " , " ) ] ) )
else :
retval . append ( self . decode_payload_content ( pattern [ i : ] ) )
size = 0
else :
if pattern [ i ] == " v " :
retval . append ( self . decode_payload_varint ( ) )
if pattern [ i ] == " i " :
retval . append ( self . decode_payload_node ( ) )
if pattern [ i ] == " H " :
retval . append ( unpack ( " >H " , self . payload [ self . payloadOffset : self . payloadOffset + 2 ] ) [ 0 ] )
self . payloadOffset + = 2
if pattern [ i ] == " I " :
retval . append ( unpack ( " >I " , self . payload [ self . payloadOffset : self . payloadOffset + 4 ] ) [ 0 ] )
self . payloadOffset + = 4
if pattern [ i ] == " Q " :
retval . append ( unpack ( " >Q " , self . payload [ self . payloadOffset : self . payloadOffset + 8 ] ) [ 0 ] )
self . payloadOffset + = 8
return retval
def bm_command_error ( self ) :
fatalStatus , banTime , inventoryVector , errorText = self . decode_payload_content ( " vvlsls " )
def bm_command_getdata ( self ) :
items = self . decode_payload_content ( " l32s " )
#self.antiIntersectionDelay(True) # only handle getdata requests if we have been connected long enough
for i in items :
logger . debug ( ' received getdata request for item: ' + hexlify ( i ) )
if self . objectHashHolderInstance . hasHash ( i ) :
self . antiIntersectionDelay ( )
else :
if i in Inventory ( ) :
self . append_write_buf ( protocol . CreatePacket ( ' object ' , Inventory ( ) [ i ] . payload ) )
else :
#self.antiIntersectionDelay()
logger . warning ( ' %s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it. ' % ( self . peer , ) )
def bm_command_object ( self ) :
lengthOfTimeWeShouldUseToProcessThisMessage = shared . checkAndShareObjectWithPeers ( self . payload )
self . downloadQueue . task_done ( calculateInventoryHash ( self . payload ) )
def bm_command_addr ( self ) :
addresses = self . decode_payload_content ( " lQbQ16sH " )
def bm_command_ping ( self ) :
self . append_write_buf ( protocol . CreatePacket ( ' pong ' ) )
def bm_command_pong ( self ) :
# nothing really
pass
def bm_command_verack ( self ) :
self . verackReceived = True
return True
if self . verackSent :
if self . isSSL :
self . set_state ( " tls_init " , self . payloadLength )
else :
self . set_state ( " bm_ready " , self . payloadLength )
else :
self . set_state ( " bm_header " , self . payloadLength )
self . bm_proto_reset ( )
return False
def bm_command_version ( self ) :
self . remoteProtocolVersion , self . services , self . timestamp , padding1 , self . myExternalIP , padding2 , self . remoteNodeIncomingPort = protocol . VersionPacket . unpack ( self . payload [ : protocol . VersionPacket . size ] )
#self.remoteProtocolVersion, self.services, self.timestamp, padding1, self.myExternalIP, padding2, self.remoteNodeIncomingPort = protocol.VersionPacket.unpack(self.payload[:protocol.VersionPacket.size])
self . remoteProtocolVersion , self . services , self . timestamp , self . sockNode , self . peerNode , self . nonce , self . userAgent , self . streams = self . decode_payload_content ( " IQQiiQlslv " )
self . timeOffset = self . timestamp - int ( time . time ( ) )
print " remoteProtocolVersion: %i " % ( self . remoteProtocolVersion )
print " services: %08X " % ( self . services )
print " time offset: %i " % ( self . timestamp - int ( time . time ( ) ) )
print " my external IP: %s " % ( socket . inet_ntoa ( self . myExternalIP ) )
print " remote node incoming port: %i " % ( self . remoteNodeIncomingPort )
useragentLength , lengthOfUseragentVarint = addresses . decodeVarint ( self . payload [ 80 : 84 ] )
readPosition = 80 + lengthOfUseragentVarint
self . userAgent = self . payload [ readPosition : readPosition + useragentLength ]
readPosition + = useragentLength
print " my external IP: %s " % ( self . sockNode . address )
print " remote node incoming port: %i " % ( self . peerNode . port )
print " user agent: %s " % ( self . userAgent )
if not self . peerValidityChecks ( ) :
# TODO ABORT
return True
self . write_buf + = protocol . CreatePacket ( ' verack ' )
self . append_write_buf( protocol . CreatePacket ( ' verack ' ) )
self . verackSent = True
if ( ( self . services & protocol . NODE_SSL == protocol . NODE_SSL ) and
protocol . haveSSL ( not self . isOutbound ) ) :
@ -141,21 +244,21 @@ class BMConnection(TLSDispatcher):
def peerValidityChecks ( self ) :
if self . remoteProtocolVersion < 3 :
self . write_buf + = protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your is using an old protocol. Closing connection. " )
self . append_write_buf( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your is using an old protocol. Closing connection. " ) )
logger . debug ( ' Closing connection to old protocol version %s , node: %s ' ,
str ( self . remoteProtocolVersion ) , str ( self . peer ) )
return False
if self . timeOffset > 3600 :
self . write_buf + = protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your time is too far in the future compared to mine. Closing connection. " )
self . append_write_buf( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your time is too far in the future compared to mine. Closing connection. " ) )
logger . info ( " %s ' s time is too far in the future ( %s seconds). Closing connection to it. " ,
self . peer , self . timeOffset )
shared . timeOffsetWrongCount + = 1
return False
elif self . timeOffset < - 3600 :
self . write_buf + = protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your time is too far in the past compared to mine. Closing connection. " )
self . append_write_buf( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " Your time is too far in the past compared to mine. Closing connection. " ) )
logger . info ( " %s ' s time is too far in the past (timeOffset %s seconds). Closing connection to it. " ,
self . peer , self . timeOffset )
shared . timeOffsetWrongCount + = 1
@ -163,8 +266,8 @@ class BMConnection(TLSDispatcher):
else :
shared . timeOffsetWrongCount = 0
if len ( self . streams ) == 0 :
self . write_buf + = protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " We don ' t have shared stream interests. Closing connection. " ) ) )
self . append_write_buf( protocol . assembleErrorMessage ( fatal = 2 ,
errorText = " We don ' t have shared stream interests. Closing connection. " ) )
logger . debug ( ' Closed connection to %s because there is no overlapping interest in streams. ' ,
str ( self . peer ) )
return False
@ -174,8 +277,8 @@ class BMConnection(TLSDispatcher):
def sendChunk ( ) :
if numberOfAddressesInAddrMessage == 0 :
return
self . write_buf + = protocol . CreatePacket ( ' addr ' , \
addresses . encodeVarint ( numberOfAddressesInAddrMessage ) + payload ) ) )
self . append_write_buf( protocol . CreatePacket ( ' addr ' , \
addresses . encodeVarint ( numberOfAddressesInAddrMessage ) + payload ) )
# We are going to share a maximum number of 1000 addrs (per overlapping
# stream) with our peer. 500 from overlapping streams, 250 from the
@ -265,7 +368,7 @@ class BMConnection(TLSDispatcher):
payload = encodeVarint ( objectCount ) + payload
logger . debug ( ' Sending huge inv message with %i objects to just this one peer ' ,
str ( numberOfObjects ) )
self . write_buf + = protocol . CreatePacket ( ' inv ' , payload )
self . append_write_buf( protocol . CreatePacket ( ' inv ' , payload ) )
# Select all hashes for objects in this stream.
bigInvList = { }
@ -335,15 +438,15 @@ if __name__ == "__main__":
direct = BMConnection ( host )
while len ( asyncore . socket_map ) > 0 :
print " loop, state = %s " % ( direct . state )
asyncore . loop ( timeout = 1 , count = 1 )
asyncore . loop ( timeout = 1 0 , count = 1 )
continue
proxy = Socks5BMConnection ( host )
while len ( asyncore . socket_map ) > 0 :
# print "loop, state = %s" % (proxy.state)
asyncore . loop ( timeout = 1 , count = 1 )
asyncore . loop ( timeout = 1 0 , count = 1 )
proxy = Socks4aBMConnection ( host )
while len ( asyncore . socket_map ) > 0 :
# print "loop, state = %s" % (proxy.state)
asyncore . loop ( timeout = 1 , count = 1 )
asyncore . loop ( timeout = 1 0 , count = 1 )