Merge pull request #661 from Bitmessage/bpeel-ipv6

Bpeel ipv6
This commit is contained in:
Jonathan Warren 2014-04-27 18:10:08 -04:00
commit 0ca27f73f3
6 changed files with 184 additions and 46 deletions

View File

@ -13,6 +13,9 @@ import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdo
# The next 3 are used for the API
import singleton
import os
import socket
import ctypes
from struct import pack
from SimpleXMLRPCServer import SimpleXMLRPCServer
from api import MySimpleXMLRPCRequestHandler
@ -71,6 +74,56 @@ def connectToStream(streamNumber):
a.setup(streamNumber, selfInitiatedConnections)
a.start()
def _fixWinsock():
if not ('win32' in sys.platform) and not ('win64' in sys.platform):
return
# Python 2 on Windows doesn't define a wrapper for
# socket.inet_ntop but we can make one ourselves using ctypes
if not hasattr(socket, 'inet_ntop'):
addressToString = ctypes.windll.ws2_32.WSAAddressToStringA
def inet_ntop(family, host):
if family == socket.AF_INET:
if len(host) != 4:
raise ValueError("invalid IPv4 host")
host = pack("hH4s8s", socket.AF_INET, 0, host, "\0" * 8)
elif family == socket.AF_INET6:
if len(host) != 16:
raise ValueError("invalid IPv6 host")
host = pack("hHL16sL", socket.AF_INET6, 0, 0, host, 0)
else:
raise ValueError("invalid address family")
buf = "\0" * 64
lengthBuf = pack("I", len(buf))
addressToString(host, len(host), None, buf, lengthBuf)
return buf[0:buf.index("\0")]
socket.inet_ntop = inet_ntop
# Same for inet_pton
if not hasattr(socket, 'inet_pton'):
stringToAddress = ctypes.windll.ws2_32.WSAStringToAddressA
def inet_pton(family, host):
buf = "\0" * 28
lengthBuf = pack("I", len(buf))
if stringToAddress(str(host),
int(family),
None,
buf,
lengthBuf) != 0:
raise socket.error("illegal IP address passed to inet_pton")
if family == socket.AF_INET:
return buf[4:8]
elif family == socket.AF_INET6:
return buf[8:24]
else:
raise ValueError("invalid address family")
socket.inet_pton = inet_pton
# These sockopts are needed on for IPv6 support
if not hasattr(socket, 'IPPROTO_IPV6'):
socket.IPPROTO_IPV6 = 41
if not hasattr(socket, 'IPV6_V6ONLY'):
socket.IPV6_V6ONLY = 27
# This thread, of which there is only one, runs the API.
class singleAPI(threading.Thread):
@ -95,6 +148,8 @@ if shared.useVeryEasyProofOfWorkForTesting:
class Main:
def start(self, daemon=False):
_fixWinsock()
shared.daemon = daemon
# is the application already running? If yes then exit.
thisapp = singleton.singleinstance()

View File

@ -55,7 +55,28 @@ class outgoingSynSender(threading.Thread):
shared.alreadyAttemptedConnectionsListLock.release()
timeNodeLastSeen = shared.knownNodes[
self.streamNumber][peer]
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
if peer.host.find(':') == -1:
address_family = socket.AF_INET
else:
address_family = socket.AF_INET6
try:
sock = socks.socksocket(address_family, socket.SOCK_STREAM)
except:
"""
The line can fail on Windows systems which aren't
64-bit compatiable:
File "C:\Python27\lib\socket.py", line 187, in __init__
_sock = _realsocket(family, type, proto)
error: [Errno 10047] An address incompatible with the requested protocol was used
So let us remove the offending address from our knownNodes file.
"""
shared.knownNodesLock.acquire()
del shared.knownNodes[self.streamNumber][peer]
shared.knownNodesLock.release()
with shared.printLock:
print 'deleting ', peer, 'from shared.knownNodes because it caused a socks.socksocket exception. We must not be 64-bit compatible.'
continue
# This option apparently avoids the TIME_WAIT state so that we
# can rebind faster
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

View File

@ -532,6 +532,32 @@ class receiveDataThread(threading.Thread):
shared.broadcastToSendDataQueues((self.streamNumber, 'advertiseobject', hash))
"""
def _checkIPv4Address(self, host, hostFromAddrMessage):
# print 'hostFromAddrMessage', hostFromAddrMessage
if host[0] == '\x7F':
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
return False
if host[0] == '\x0A':
print 'Ignoring IP address in private range:', hostFromAddrMessage
return False
if host[0:2] == '\xC0A8':
print 'Ignoring IP address in private range:', hostFromAddrMessage
return False
return True
def _checkIPv6Address(self, host, hostFromAddrMessage):
if host == ('\x00' * 15) + '\x01':
print 'Ignoring loopback address:', hostFromAddrMessage
return False
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
print 'Ignoring local address:', hostFromAddrMessage
return False
if (ord(host[0]) & 0xfe) == 0xfc:
print 'Ignoring unique local address:', hostFromAddrMessage
return False
return True
# We have received an addr message.
def recaddr(self, data):
#listOfAddressDetailsToBroadcastToPeers = []
@ -551,14 +577,11 @@ class receiveDataThread(threading.Thread):
for i in range(0, numberOfAddressesIncluded):
try:
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
with shared.printLock:
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
continue
fullHost = data[20 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)]
except Exception as err:
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
'ERROR TRYING TO UNPACK recaddr (recaddrHost). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
try:
@ -592,18 +615,19 @@ class receiveDataThread(threading.Thread):
break # giving up on unpacking any more. We should still be connected however.
# print 'Within recaddr(): IP', recaddrIP, ', Port',
# recaddrPort, ', i', i
hostFromAddrMessage = socket.inet_ntoa(data[
32 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)])
# print 'hostFromAddrMessage', hostFromAddrMessage
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x7F':
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
continue
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x0A':
print 'Ignoring IP address in private range:', hostFromAddrMessage
continue
if data[32 + lengthOfNumberOfAddresses + (38 * i):34 + lengthOfNumberOfAddresses + (38 * i)] == '\xC0A8':
print 'Ignoring IP address in private range:', hostFromAddrMessage
continue
if fullHost[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
ipv4Host = fullHost[12:]
hostFromAddrMessage = socket.inet_ntop(socket.AF_INET, ipv4Host)
if not self._checkIPv4Address(ipv4Host, hostFromAddrMessage):
continue
else:
hostFromAddrMessage = socket.inet_ntop(socket.AF_INET6, fullHost)
if hostFromAddrMessage == "":
# This can happen on Windows systems which are not 64-bit compatible
# so let us drop the IPv6 address.
continue
if not self._checkIPv6Address(fullHost, hostFromAddrMessage):
continue
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
@ -688,8 +712,7 @@ class receiveDataThread(threading.Thread):
payload += pack('>I', self.streamNumber)
payload += pack(
'>q', 1) # service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(HOST)
payload += shared.encodeHost(HOST)
payload += pack('>H', PORT) # remote port
for (HOST, PORT), value in addrsInChildStreamLeft.items():
timeLastReceivedMessageFromThisNode = value
@ -700,8 +723,7 @@ class receiveDataThread(threading.Thread):
payload += pack('>I', self.streamNumber * 2)
payload += pack(
'>q', 1) # service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(HOST)
payload += shared.encodeHost(HOST)
payload += pack('>H', PORT) # remote port
for (HOST, PORT), value in addrsInChildStreamRight.items():
timeLastReceivedMessageFromThisNode = value
@ -712,8 +734,7 @@ class receiveDataThread(threading.Thread):
payload += pack('>I', (self.streamNumber * 2) + 1)
payload += pack(
'>q', 1) # service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(HOST)
payload += shared.encodeHost(HOST)
payload += pack('>H', PORT) # remote port
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload

View File

@ -113,8 +113,7 @@ class sendDataThread(threading.Thread):
payload += pack('>I', streamNumber)
payload += pack(
'>q', services) # service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(host)
payload += shared.encodeHost(host)
payload += pack('>H', port)
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload

View File

@ -4,6 +4,8 @@ import socket
from class_sendDataThread import *
from class_receiveDataThread import *
import helper_bootstrap
import errno
import re
# Only one singleListener thread will ever exist. It creates the
# receiveDataThread and sendDataThread for each incoming connection. Note
@ -21,6 +23,21 @@ class singleListener(threading.Thread):
def setup(self, selfInitiatedConnections):
self.selfInitiatedConnections = selfInitiatedConnections
def _createListenSocket(self, family):
HOST = '' # Symbolic name meaning all available interfaces
PORT = shared.config.getint('bitmessagesettings', 'port')
sock = socket.socket(family, socket.SOCK_STREAM)
if family == socket.AF_INET6:
# Make sure we can accept both IPv4 and IPv6 connections.
# This is the default on everything apart from Windows
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
# This option apparently avoids the TIME_WAIT state so that we can
# rebind faster
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((HOST, PORT))
sock.listen(2)
return sock
def run(self):
while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'):
time.sleep(1)
@ -35,14 +52,22 @@ class singleListener(threading.Thread):
with shared.printLock:
print 'Listening for incoming connections.'
HOST = '' # Symbolic name meaning all available interfaces
PORT = shared.config.getint('bitmessagesettings', 'port')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# This option apparently avoids the TIME_WAIT state so that we can
# rebind faster
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((HOST, PORT))
sock.listen(2)
# First try listening on an IPv6 socket. This should also be
# able to accept connections on IPv4. If that's not available
# we'll fall back to IPv4-only.
try:
sock = self._createListenSocket(socket.AF_INET6)
except socket.error, e:
if (isinstance(e.args, tuple) and
e.args[0] in (errno.EAFNOSUPPORT,
errno.EPFNOSUPPORT,
errno.ENOPROTOOPT)):
sock = self._createListenSocket(socket.AF_INET)
else:
raise
# regexp to match an IPv4-mapped IPv6 address
mappedAddressRegexp = re.compile(r'^::ffff:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)$')
while True:
# We typically don't want to accept incoming connections if the user is using a
@ -56,18 +81,29 @@ class singleListener(threading.Thread):
print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.'
time.sleep(10)
a, (HOST, PORT) = sock.accept()
# The following code will, unfortunately, block an incoming
# connection if someone else on the same LAN is already connected
# because the two computers will share the same external IP. This
# is here to prevent connection flooding.
while HOST in shared.connectedHostsList:
with shared.printLock:
print 'We are already connected to', HOST + '. Ignoring connection.'
while True:
a, sockaddr = sock.accept()
(HOST, PORT) = sockaddr[0:2]
# If the address is an IPv4-mapped IPv6 address then
# convert it to just the IPv4 representation
md = mappedAddressRegexp.match(HOST)
if md != None:
HOST = md.group(1)
# The following code will, unfortunately, block an
# incoming connection if someone else on the same LAN
# is already connected because the two computers will
# share the same external IP. This is here to prevent
# connection flooding.
if HOST in shared.connectedHostsList:
a.close()
with shared.printLock:
print 'We are already connected to', HOST + '. Ignoring connection.'
else:
break
a.close()
a, (HOST, PORT) = sock.accept()
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
a.settimeout(20)

View File

@ -96,6 +96,13 @@ def isInSqlInventory(hash):
queryreturn = sqlQuery('''select hash from inventory where hash=?''', hash)
return queryreturn != []
def encodeHost(host):
if host.find(':') == -1:
return '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(host)
else:
return socket.inet_pton(socket.AF_INET6, host)
def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
payload = ''
payload += pack('>L', 2) # protocol version.
@ -104,8 +111,7 @@ def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
payload += pack(
'>q', 1) # boolservices of remote connection; ignored by the remote host.
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(remoteHost)
payload += encodeHost(remoteHost)
payload += pack('>H', remotePort) # remote IPv6 and port
payload += pack('>q', 1) # bitflags of the services I offer.