Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/Connection/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@
from Crypt import CryptConnection


def ensure_type(instance, desired_type, allow_none=False):
if allow_none and instance == None:
return True
if not isinstance(instance, desired_type):
raise PeerMsgError("Type mismatch. Expect " + repr(desired_type))

class PeerMsgError(Exception):
"""Denotes to a detected, rather than unexpected, error in incoming data."""


class Connection(object):
__slots__ = (
"sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "req_id",
Expand Down Expand Up @@ -159,15 +169,23 @@ def messageLoop(self):
buff = None
for message in self.unpacker:
self.incomplete_buff_recv = 0
if "stream_bytes" in message:
if not isinstance(message, dict):
self.log("Message must be a dict...")
continue
elif "stream_bytes" in message:
self.handleStream(message)
else:
self.handleMessage(message)

message = None

except PeerMsgError, detected_error:
if not self.closed:
self.log("Peer error detected: %s" % detected_error.message)
except Exception, err:
if not self.closed:
self.log("Socket error: %s" % Debug.formatException(err))

self.close("MessageLoop ended") # MessageLoop ended, close connection

# My handshake info
Expand Down Expand Up @@ -234,13 +252,19 @@ def setHandshake(self, handshake):
# Handle incoming message
def handleMessage(self, message):
self.last_message_time = time.time()

if message.get("cmd") == "response": # New style response
ensure_type(message.get("to"), int)
ensure_type(message.get("crypt"), str, allow_none=True)

if message["to"] in self.waiting_requests:
if self.last_send_time and len(self.waiting_requests) == 1:
ping = time.time() - self.last_send_time
self.last_ping_delay = ping
self.waiting_requests[message["to"]].set(message) # Set the response to event
# Note the Type! ~~~~~~~~~~~~~
del self.waiting_requests[message["to"]]

elif message["to"] == 0: # Other peers handshake
ping = time.time() - self.start_time
if config.debug_socket:
Expand All @@ -260,6 +284,7 @@ def handleMessage(self, message):
return

self.setHandshake(message)

else:
self.log("Unknown response: %s" % message)
elif message.get("cmd"): # Handhsake request
Expand All @@ -277,6 +302,10 @@ def handleMessage(self, message):

# Incoming handshake set request
def handleHandshake(self, message):
ensure_type(message.get("req_id"), int)
ensure_type(message.get("params"), dict)
ensure_type(message["params"].get("peer_id"), str)

if config.debug_socket:
self.log("Handshake request: %s" % message)
self.setHandshake(message["params"])
Expand Down