@ -16,13 +16,12 @@ class DummyServer(smtpd.SMTPServer):
else :
self . return_status = b ' return status '
def process_message ( self , peer , mailfrom , rcpttos , data ) :
def process_message ( self , peer , mailfrom , rcpttos , data , * * kw ) :
self . messages . append ( ( peer , mailfrom , rcpttos , data ) )
if data == self . return_status :
return ' 250 Okish '
def process_smtputf8_message ( self , * args , * * kwargs ) :
return ' 250 SMTPUTF8 message okish '
if ' mail_options ' in kw and ' SMTPUTF8 ' in kw [ ' mail_options ' ] :
return ' 250 SMTPUTF8 message okish '
class DummyDispatcherBroken ( Exception ) :
@ -54,22 +53,6 @@ class SMTPDServerTest(unittest.TestCase):
write_line ( b ' DATA ' )
self . assertRaises ( NotImplementedError , write_line , b ' spam \r \n . \r \n ' )
def test_process_smtputf8_message_unimplemented ( self ) :
server = smtpd . SMTPServer ( ( support . HOST , 0 ) , ( ' b ' , 0 ) ,
enable_SMTPUTF8 = True )
conn , addr = server . accept ( )
channel = smtpd . SMTPChannel ( server , conn , addr , enable_SMTPUTF8 = True )
def write_line ( line ) :
channel . socket . queue_recv ( line )
channel . handle_read ( )
write_line ( b ' EHLO example ' )
write_line ( b ' MAIL From: <eggs@example> BODY=8BITMIME SMTPUTF8 ' )
write_line ( b ' RCPT To: <spam@example> ' )
write_line ( b ' DATA ' )
self . assertRaises ( NotImplementedError , write_line , b ' spam \r \n . \r \n ' )
def test_decode_data_default_warns ( self ) :
with self . assertWarns ( DeprecationWarning ) :
smtpd . SMTPServer ( ( support . HOST , 0 ) , ( ' b ' , 0 ) )
@ -168,7 +151,8 @@ class DebuggingServerTest(unittest.TestCase):
enable_SMTPUTF8 = True )
stdout = s . getvalue ( )
self . assertEqual ( stdout , textwrap . dedent ( """ \
- - - - - SMTPUTF8 MESSAGE FOLLOWS - - - - - -
- - - - - - - - - - MESSAGE FOLLOWS - - - - - - - - - -
mail options : [ ' BODY=8BITMIME ' , ' SMTPUTF8 ' ]
b ' From: test '
b ' X-Peer: peer-address '
b ' '
@ -201,6 +185,109 @@ class TestFamilyDetection(unittest.TestCase):
self . assertEqual ( server . socket . family , socket . AF_INET )
class TestRcptOptionParsing ( unittest . TestCase ) :
error_response = ( b ' 555 RCPT TO parameters not recognized or not '
b ' implemented \r \n ' )
def setUp ( self ) :
smtpd . socket = asyncore . socket = mock_socket
self . old_debugstream = smtpd . DEBUGSTREAM
self . debug = smtpd . DEBUGSTREAM = io . StringIO ( )
def tearDown ( self ) :
asyncore . close_all ( )
asyncore . socket = smtpd . socket = socket
smtpd . DEBUGSTREAM = self . old_debugstream
def write_line ( self , channel , line ) :
channel . socket . queue_recv ( line )
channel . handle_read ( )
def test_params_rejected ( self ) :
server = DummyServer ( ( support . HOST , 0 ) , ( ' b ' , 0 ) , decode_data = False )
conn , addr = server . accept ( )
channel = smtpd . SMTPChannel ( server , conn , addr , decode_data = False )
self . write_line ( channel , b ' EHLO example ' )
self . write_line ( channel , b ' MAIL from: <foo@example.com> size=20 ' )
self . write_line ( channel , b ' RCPT to: <foo@example.com> foo=bar ' )
self . assertEqual ( channel . socket . last , self . error_response )
def test_nothing_accepted ( self ) :
server = DummyServer ( ( support . HOST , 0 ) , ( ' b ' , 0 ) , decode_data = False )
conn , addr = server . accept ( )
channel = smtpd . SMTPChannel ( server , conn , addr , decode_data = False )
self . write_line ( channel , b ' EHLO example ' )
self . write_line ( channel , b ' MAIL from: <foo@example.com> size=20 ' )
self . write_line ( channel , b ' RCPT to: <foo@example.com> ' )
self . assertEqual ( channel . socket . last , b ' 250 OK \r \n ' )
class TestMailOptionParsing ( unittest . TestCase ) :
error_response = ( b ' 555 MAIL FROM parameters not recognized or not '
b ' implemented \r \n ' )
def setUp ( self ) :
smtpd . socket = asyncore . socket = mock_socket
self . old_debugstream = smtpd . DEBUGSTREAM
self . debug = smtpd . DEBUGSTREAM = io . StringIO ( )
def tearDown ( self ) :
asyncore . close_all ( )
asyncore . socket = smtpd . socket = socket
smtpd . DEBUGSTREAM = self . old_debugstream
def write_line ( self , channel , line ) :
channel . socket . queue_recv ( line )
channel . handle_read ( )
def test_with_decode_data_true ( self ) :
server = DummyServer ( ( support . HOST , 0 ) , ( ' b ' , 0 ) , decode_data = True )
conn , addr = server . accept ( )
channel = smtpd . SMTPChannel ( server , conn , addr , decode_data = True )
self . write_line ( channel , b ' EHLO example ' )
for line in [
b ' MAIL from: <foo@example.com> size=20 SMTPUTF8 ' ,
b ' MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME ' ,
b ' MAIL from: <foo@example.com> size=20 BODY=UNKNOWN ' ,
b ' MAIL from: <foo@example.com> size=20 body=8bitmime ' ,
] :
self . write_line ( channel , line )
self . assertEqual ( channel . socket . last , self . error_response )
self . write_line ( channel , b ' MAIL from: <foo@example.com> size=20 ' )
self . assertEqual ( channel . socket . last , b ' 250 OK \r \n ' )
def test_with_decode_data_false ( self ) :
server = DummyServer ( ( support . HOST , 0 ) , ( ' b ' , 0 ) , decode_data = False )
conn , addr = server . accept ( )
channel = smtpd . SMTPChannel ( server , conn , addr , decode_data = False )
self . write_line ( channel , b ' EHLO example ' )
for line in [
b ' MAIL from: <foo@example.com> size=20 SMTPUTF8 ' ,
b ' MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME ' ,
] :
self . write_line ( channel , line )
self . assertEqual ( channel . socket . last , self . error_response )
self . write_line (
channel ,
b ' MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN ' )
self . assertEqual (
channel . socket . last ,
b ' 501 Error: BODY can only be one of 7BIT, 8BITMIME \r \n ' )
self . write_line (
channel , b ' MAIL from: <foo@example.com> size=20 body=8bitmime ' )
self . assertEqual ( channel . socket . last , b ' 250 OK \r \n ' )
def test_with_enable_smtputf8_true ( self ) :
server = DummyServer ( ( support . HOST , 0 ) , ( ' b ' , 0 ) , enable_SMTPUTF8 = True )
conn , addr = server . accept ( )
channel = smtpd . SMTPChannel ( server , conn , addr , enable_SMTPUTF8 = True )
self . write_line ( channel , b ' EHLO example ' )
self . write_line (
channel ,
b ' MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8 ' )
self . assertEqual ( channel . socket . last , b ' 250 OK \r \n ' )
class SMTPDChannelTest ( unittest . TestCase ) :
def setUp ( self ) :
smtpd . socket = asyncore . socket = mock_socket