RIFF¤ WEBPVP8 ˜ ðÑ *ôô>‘HŸK¥¤"§£±¨àð ....................................../////.===Shadow-Here===./////................................................ > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < ------------------------------------------------------------------------------------------------------------------- /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// RIFF¤ WEBPVP8 ˜ ðÑ *ôô>‘HŸK¥¤"§£±¨àð enü¹%½_F‘åè¿2ºQú³íªú`N¿­3ÿƒügµJžaÿ¯ÿ°~¼ÎùnúîÞÖô•òíôÁÉß®Sm¥Ü/ ‡ó˜f£Ùà<˜„xëJ¢Ù€SO3x<ªÔ©4¿+ç¶A`q@Ì“Úñè™ÍÿJÌ´ª-˜ÆtÊÛL]Ïq*‘Ý”ì#ŸÌÏãY]@ê`¿ /ªfkØB4·®£ó z—Üw¥Pxù–ÞLШKÇN¾AkÙTf½è'‰g gÆv›Øuh~ a˜Z— ïj*á¥t d£“uÒ ¨`K˜¹ßþ]b>˜]_ÏÔ6W—è2r4x•íÖ…"ƒÖNîä!¦å Ú}ýxGøÌ —@ ;ÆÚŠ=ɾ1ý8lªË¥ô ^yf®Œ¢u&2©nÙÇ›ñÂñŒ³ aPo['½»øFùà­+4ê“$!lövlüÞ=;N®3ð‚õ›DÉKòÞ>ÄÍ ¥ˆuߤ#ˆ$6ù™¥îЇy’ÍB¼ çxÛ;X"WL£R÷͝*ó-¶Zu}º.s¸sšXqù–DþÿvªhüïwyŸ ¯é³lÀ:KCûÄ£Ëá\…­ ~—ýóî ¼ûûÜTÓüÇy…ŽÆvc»¾×U ñ¸žþоP÷¦ó:Ò¨¨5;Ð#&#ÖúñläÿÁœ GxÉ­/ñ‡áQðìYÉtÒw޼GÔ´zàÒò ð*ëzƒ•4~H]Ø‹f ñÓÈñ`NåWçs'ÆÏW^ø¹!XžµmQ5ÃËoLœÎ: ÞËÍ¥J ù…î èo£ßPÎñ¶ž8.Œ]ʵ~5›ÙË-ù*8ÙÖß±~ ©¹rÓê‚j¶d¸{^Q'˜±Crß ÚH—#¥¥QlÀ×ëã‡DÜ«èî þ&Çæžî;ŽÏºò6ÒLÃXy&ZŒ'j‚¢Ù€IßÚù+–MGi‰*jE€‘JcÜ ÓÌ EÏÚj]o˜ Þr <¾U ûŪæÍ/šÝH¥˜b”¼ ÁñßX GP›ï2›4WŠÏà×£…íÓk†¦H·ÅíMh–*nó÷à]ÁjCº€b7<ب‹¨5車bp2:Á[UªM„QŒçiNMa#<5›áËó¸HýÊ"…×Éw¹¦ì2º–x<›»a±¸3Weü®FÝ⑱ö–î–³|LPÈ~çð~Çå‡|º kD¢µÏàÆAI %1À% ¹Ò – ”ϝS¦‰4&¶£°à Öý”û_Ò Áw°A«Å€?mÇÛgHÉ/8)á¾ÛìáöŽP í¨PŸNÙµº¦‡§Ùš"ÿ«>+ªÕ`Ê÷‡‚ß Õû˜þãÇ-PÍ.¾XV‘€ dÜ"þ4¹ ±Oú‘©t¥¦FªÄÃÄ•b‚znýu½—#cDs˜ÃiÑOˆñ×QO=*IAÊ,¶ŽZƒ;‡wøXè%EÐk:F±Ú” .Ѽ+Áu&Ç`."pÈÉw o&¿dE6‘’EqTuK@Ì¥ã™À(Êk(h‰,H}RÀIXÛš3µ1©_OqÚÒJAñ$ÊÙÜ;D3çŒ[þùœh¬Ã³™ö6ç†NY".Ú‰ï[ªŸŒ '²Ð öø_¨ÂÉ9ué¶³ÒŠõTàîMØ#û¯gN‡bÙ놚X„ö …ÉeüÌ^J ‹€.œ$Æ)βÄeæW#óüßĺŸ€ ÀzwV 9oä»f4V*uB «Ë†¹ì¯žR霓æHXa=&“I4K;¯ç‹h×·"UŠ~<•╪Vêª&ÍSÃÆÅ?ÔqÎ*mTM ˜›µwêd#[C¡©§‘D<©àb†–ÁœøvH/,í:¯( ²£|4-„Æövv„Yͼ™^Á$ˆ„¢Û[6yB.åH*V¨æ?$=˜Ñ€•ñ·­(VlŸ‘ nÀt8W÷´Bûba?q9ú¶Xƒl«ÿ\ù¶’þòUÐj/õ¢Ìµ³g$ƒÎR!¸»|Oߍë’BhîÚÑ¢ñåŒJ„®„£2Ð3•ô02Nt…!£Í]Ïc½Qÿ?ˆ<&ÃA¾Ú,JˆijÌ#5yz„‰Î|ÊŽ5QÏ:‹ÐaóVÔxW—CpeÏzÐïíçôÿÅ_[hãsÐ_/ŽTÝ?BîˆííV$<¿i>²F¬_Eß¿ †bÊŒº­ÿ®Z H“C}”¬,Mp ý/Bá£w>˜YV°aƒúh+cŠ- r/[%|üUMHäQ°X»|û/@|°¥Ð !BÔ Ç¢Ä©š+Õì D«7ìN¶ŽðÔ " ƶ’ÖçtA‰Û×}{tþz­¾GÍ›k¹OEJR$ Â׃ «ëÁ"oÉôž$oUK(Ä)Ãz³Ê-‹êN[Ò3Œñbï8P 4ƒ×q¢bo|?<ÛX¬òÄͰL–±›(™ûG?ýË©ÚÄ–ÂDØÐ_Ç¡ô ¾–ÄÏø ×e8Ë©$ÄF¹Å‹ì[©óìl:F¾f´‹‹Xì²ï®\¬ôùƒ ÿat¥óèÒùHß0äe‚;ü×h:ÆWðHž=Ã8骣"kœ'Y?³}Tûè€>?0l›e1Lòñ„aæKÆw…hÖŠùW…ÈÆÄ0ši·›[pcwËþñiêíY/~-Á5˜!¿†A›™Mÿþ(±“t@â“ö2­´TG5yé]çå僳 .·ÍïçÝ7UÚ±Ð/Nè»,_Ï ùdj7\ï Wì4›„»c¸àešg#ÒÊ⥭áØo5‘?ÌdÝô¯ ¹kzsƒ=´#ëÉK›Ø´±-¥eW?‡çßtòTã…$Ý+qÿ±ƒ÷_3Ô¥í÷:æ–ž<·Ö‡‰Å¢ š‡%Ô—utÌÈìðžgÖÀz²À—ï÷Óîäõ{K'´È÷³yaÏÁjƒô}ž§®æÊydÕÈë5¯èˆõvÕ©ã*çD„ “z„Ó‡^^xÂ3M§A´JG‚öï 3W'ˆ.OvXè¡ÊÕª?5º7†˜(˜Ç¶#çê’¶!ÌdZK§æ 0fãaN]òY³RV ™î$®K2R¨`W!1Ôó\;Ý ýB%qæK•&ÓÈe9È0êI±žeŸß -ú@žQr¦ ö4»M¼Áè¹µmw 9 EÆE_°2ó„ŸXKWÁ×Hóì^´²GѝF©óäR†¦‰ç"V»eØ<3ùd3ÿÚ¤Žú“Gi" —‘_ÙËÎ~Üö¯¥½Î»üŸEÚŽåmÞþí ;ÞólËΦMzA"Âf(´òá;Éï(/7½ûñÌ­cïÕçлþÝz¾-ÍvÑ“pH­–ðÓj$¸Äû¤‚‘ãUBË-n“2åPkS5&‹Â|+g^œ®Ì͆d!OïäîU«c;{Û!ÅŽ«ëZ9Ókóˆ]¯ƒ›né `ÇÒ+tÆš (ØKá¾—=3œ®•vuMñg²\ï Ec€ 05±d™‡×iÇ×›UúvÌ¢£Èþ¡ÕØô¶ßÎA"ß±#Ö²ˆÊŸ¦*Ä~ij|àø.-¼'»Ú¥£h ofº¦‡VsR=N½„Î v˜Z*SÌ{=jÑB‹tê…;’HžH¯8–îDù8ñ¢|Q•bÛçš–‹m³“ê¨ åÏ^m¬Žãþ©ïêO‡½6] µÆ„Ooòü ²x}N¦Ë3ïé¿»€›HA˜m%çÞ/¿í7Fø“‹léUk)É°Œµ8Q8›:ÀŠeT*šõ~ôڝG6 ¢}`ùH­–”¡k ‰P1>š†®9z11!X wKfmÁ¦xÑ,N1Q”–æB¶M…ÒÃv6SMˆhU¬ÊPŽï‘öj=·CŒ¯u¹ƒVIЃsx4’ömÛýcå¡¶7ßŠß 57^\wÒÐÆ k§h,Œý î«q^R½3]J¸ÇðN ‚çU¬ôº^Áì} ³f©Õœ§ˆã:FÄÈ‚é(€™?àýÓüè1Gô£¼éj‚OÅñ  #>×—ßtà 0G¥Åa뀐kßhc™À_ÉñÞ#±)GD" YîäË-ÿÙ̪ ¹™a¯´¢E\ÝÒö‚;™„ë]_ p8‰o¡ñ+^÷ 3‘'dT4œŽ ðVë½° :¬víÑ«£tßÚS-3¶“þ2 †üüʨòrš¹M{É_¤`Û¨0ìjœøJ‡:÷ÃáZ˜†@GP&œÑDGÏs¡þ¦þDGú‘1Yá9Ôþ¼ ûø…§÷8&–ÜÑnÄ_m®^üÆ`;ÉVÁJ£?â€-ßê}suÍ2sõA NÌúA磸‘îÿÚ»ƒìö·á¿±tÑÐ"Tÿü˜[@/äj¬€uüªìù¥Ý˜á8Ý´sõj 8@rˆð äþZÇD®ÿUÏ2ùôõrBzÆÏÞž>Ì™xœ“ wiÎ×7_… ¸ \#€MɁV¶¥üÕÿPÔ9Z‡ø§É8#H:ƒ5ÀÝå9ÍIŒ5åKÙŠ÷qÄ>1AÈøžj"µÂд/ªnÀ qªã}"iŸBå˜ÓÛŽ¦…&ݧ;G@—³b¯“•"´4í¨ôM¨åñC‹ïùÉó¯ÓsSH2Ý@ßáM‡ˆKÀªÛUeø/4\gnm¥‹ŸŒ qÄ b9ÞwÒNÏ_4Ég³ú=܆‚´ •â¥õeíþkjz>éÚyU«Íӝ݃6"8/ø{=Ô¢»G¥ äUw°W«,ô—¿ãㆅү¢³xŠUû™yŒ (øSópÐ 9\åTâ»—*oG$/×ÍT†Y¿1¤Þ¢_‡ ¼ „±ÍçèSaÓ 3ÛMÁBkxs‰’R/¡¤ˆÙçª(*õ„üXÌ´ƒ E§´¬EF"Ù”R/ÐNyÆÂ^°?™6¡œïJ·±$§?º>ÖüœcNÌù¯G ‹ñ2ЁBB„^·úìaz¨k:#¨Æ¨8LÎõލ£^§S&cŒÐU€ü(‡F±Š¼&P>8ÙÁ ‰ p5?0ÊÆƒZl¸aô š¼¡}gÿ¶zÆC²¹¬ÎÖG*HB¡O<º2#ñŒAƒ–¡B˜´É$¥›É:FÀÔx¾u?XÜÏÓvN©RS{2ʈãk9rmP¼Qq̳ è¼ÐFׄ^¡Öì fE“F4A…!ì/…¦Lƒ… … $%´¾yã@CI¬ á—3PþBÏNÿ<ý°4Ü ËÃ#ØÍ~âW«rEñw‹eùMMHß²`¬Öó½íf³:‹k˜¯÷}Z!ã¿<¥,\#öµÀ¯aÒNÆIé,Ћ–lŽ#Àæ9ÀÒS·I’½-Ïp Äz¤Š Â* ­íÄ9­< h>׍3ZkËU¹§˜ŒŠ±f­’¤º³Q ÏB?‹#µíÃ¥®@(Gs«†vI¥Mµ‹Á©e~2ú³ÁP4ìÕi‚²Ê^ö@-DþÓàlÜOÍ]n"µã:žpsŽ¢:! Aõ.ç~ÓBûH÷JCÌ]õVƒd «ú´QÙEA–¯¯Œ!.ˆˆëQ±ù œ·Ì!Õâ )ùL„ÅÀlÚè5@B…o´Æ¸XÓ&Û…O«˜”_#‡ƒ„ûÈt!¤ÁÏ›ÎÝŠ?c9 â\>lÓÁVÄÑ™£eØY]:fÝ–—ù+p{™ðè û³”g±OƒÚSù£áÁÊ„ä,ï7š²G ÕÌBk)~ÑiCµ|h#u¤¶îK¨² #²vݯGãeÖ϶ú…¾múÀ¶þÔñ‚Š9'^($¤§ò “š½{éúp÷J›ušS¹áªCÂubÃH9™D™/ZöØÁ‡¦ÝÙŸ·kð*_”.C‹{áXó€‡c¡c€§/šò/&éš÷,àéJþ‰X›fµ“C¨œ®r¬"kL‰Â_q…Z–.ÉL~O µ›zn‚¹À¦Öª7\àHµšÖ %»ÇníV[¥*Õ;ƒ#½¾HK-ÖIÊdÏEÚ#=o÷Óò³´Š: Ç?{¾+9›–‘OEáU·S€˜j"ÄaÜ ŒÛWt› á–c#a»pÔZÞdŽtWê=9éöÊ¢µ~ ë ;Öe‡Œ®:bî3±ýê¢wà¼îpêñ¹¾4 zc¾ðÖÿzdêŒÑÒŝÀ‰s6¤í³ÎÙB¿OZ”+F¤á‡3@Ñëäg©·Ž ˆèª<ù@É{&S„œÕúÀA)‰h:YÀ5^ÂÓŒ°õäU\ ùËÍû#²?Xe¬tu‰^zÒÔãë¼ÛWtEtû …‚g¶Úüâî*moGè¨7%u!]PhÏd™Ý%Îx: VÒ¦ôÊD3ÀŽKÛËãvÆî…N¯ä>Eró–ð`5 Œ%u5XkñÌ*NU%¶áœÊ:Qÿú»“úzyÏ6å-၇¾ ´ ÒÊ]y žO‘w2Äøæ…H’²f±ÎÇ.ª|¥'gîV•Ü .̘¯€šòü¤U~Ù†*¢!?ò wý,}´°ÔÞnïoKq5µb!áÓ3"vAßH¡³¡·G(ÐÎ0Îò¼MG!/ài®@—¬04*`…«é8ªøøló“ˆÊ”èù¤…ßÊoÿé'ËuÌÖ5×È¡§ˆˆfŽë9}hìâ_!!¯  B&Ëö¶‰ÀAÙNVŸ Wh›¸®XÑJì¨ú“¿÷3uj²˜¨ÍÎìë±aúŠÝå¯ð*Ó¨ôJ“yºØ)m°WýOè68†ŸÏ2—‰Ïüꪫٚ¥‹l1 ø ÏÄFjêµvÌbü¦èÝx:X±¢H=MÐß—,ˆÉÇ´(9ú¾^ÅÚ4¿m‡$âX‘å%(AlZo@½¨UOÌÕ”1ø¸jÎÀÃÃ_ µ‘Ü.œº¦Ut: Æï’!=¯uwû#,“pþÇúŒø(é@?³ü¥‘Mo §—s@Œ#)§ŒùkL}NOÆêA›¸~r½¼ÙA—HJ«eˆÖ´*¡ÓpÌŸö.m<-"³ûÈ$¬_6­åf£ïÚâj1y§ÕJ½@dÞÁr&Í\Z%D£Íñ·AZ Û³øüd/ªAi†/Й~  ‡âĮҮÏh§°b—›Û«mJžòG'[ÈYýŒ¦9psl ýÁ ®±f¦x,‰½tN ‚Xª9 ÙÖH.«Lo0×?͹m¡å†Ѽ+›2ƒF ±Ê8 7Hցϓ²Æ–m9…òŸï]Â1äN†VLâCˆU .ÿ‰Ts +ÅÎx(%¦u]6AF Š ØF鈄‘ |¢¶c±soŒ/t[a¾–û:s·`i햍ê›ËchÈ…8ßÀUÜewŒðNOƒõD%q#éû\9¤x¹&UE×G¥ Í—™$ð E6-‡¼!ýpãÔM˜ Âsìe¯ñµK¢Ç¡ùôléœ4Ö£”À Š®Ðc ^¨À}ÙËŸ§›ºê{ÊuÉC ×Sr€¤’fÉ*j!úÓ’Gsùìoîßîn%ò· àc Wp÷$¨˜)û»H ×8ŽÒ€Zj¤3ÀÙºY'Ql¦py{-6íÔCeiØp‘‡XÊîÆUߢ܂ž£Xé¼Y8þ©ëgñß}é.ÎógÒ„ÃØËø¯»™§Xýy M%@NŠ À(~áÐvu7&•,Ù˜ó€uP‡^^®=_E„jt’ 403WebShell
403Webshell
Server IP : 104.225.223.251  /  Your IP : 216.73.216.41
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux agtdemo03 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User : root ( 0)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /proc/self/root/lib/python3/dist-packages/twisted/conch/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/lib/python3/dist-packages/twisted/conch/test/test_cftp.py
# -*- test-case-name: twisted.conch.test.test_cftp -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE file for details.

"""
Tests for L{twisted.conch.scripts.cftp}.
"""

import locale
import time, sys, os, operator, getpass, struct
from io import BytesIO

from twisted.python.filepath import FilePath
from twisted.python.reflect import requireModule
from zope.interface import implementer

pyasn1 = requireModule('pyasn1')
cryptography = requireModule('cryptography')
unix = requireModule('twisted.conch.unix')

_reason = None
if cryptography and pyasn1:
    try:
        from twisted.conch.scripts import cftp
        from twisted.conch.scripts.cftp import SSHSession
        from twisted.conch.ssh import filetransfer
        from twisted.conch.test.test_filetransfer import FileTransferForTestAvatar
        from twisted.conch.test import test_ssh, test_conch
        from twisted.conch.test.test_conch import FakeStdio
    except ImportError:
        pass

from twisted.conch import ls
from twisted.conch.interfaces import ISFTPFile
from twisted.conch.test.test_filetransfer import SFTPTestBase
from twisted.conch.test.test_filetransfer import FileTransferTestAvatar
from twisted.cred import portal
from twisted.internet import reactor, protocol, interfaces, defer, error
from twisted.internet.utils import getProcessOutputAndValue, getProcessValue
from twisted.python import log
from twisted.python.compat import _PY3, unicode
from twisted.python.fakepwd import UserDatabase
from twisted.test.proto_helpers import StringTransport
from twisted.internet.task import Clock
from twisted.trial.unittest import TestCase




class SSHSessionTests(TestCase):
    """
    Tests for L{twisted.conch.scripts.cftp.SSHSession}.
    """
    def test_eofReceived(self):
        """
        L{twisted.conch.scripts.cftp.SSHSession.eofReceived} loses the write
        half of its stdio connection.
        """
        stdio = FakeStdio()
        channel = SSHSession()
        channel.stdio = stdio
        channel.eofReceived()
        self.assertTrue(stdio.writeConnLost)



class ListingTests(TestCase):
    """
    Tests for L{lsLine}, the function which generates an entry for a file or
    directory in an SFTP I{ls} command's output.
    """
    if getattr(time, 'tzset', None) is None:
        skip = "Cannot test timestamp formatting code without time.tzset"


    def setUp(self):
        """
        Patch the L{ls} module's time function so the results of L{lsLine} are
        deterministic.
        """
        self.now = 123456789
        def fakeTime():
            return self.now
        self.patch(ls, 'time', fakeTime)

        # Make sure that the timezone ends up the same after these tests as
        # it was before.
        if 'TZ' in os.environ:
            self.addCleanup(operator.setitem, os.environ, 'TZ', os.environ['TZ'])
            self.addCleanup(time.tzset)
        else:
            def cleanup():
                # os.environ.pop is broken!  Don't use it!  Ever!  Or die!
                try:
                    del os.environ['TZ']
                except KeyError:
                    pass
                time.tzset()
            self.addCleanup(cleanup)


    def _lsInTimezone(self, timezone, stat):
        """
        Call L{ls.lsLine} after setting the timezone to C{timezone} and return
        the result.
        """
        # Set the timezone to a well-known value so the timestamps are
        # predictable.
        os.environ['TZ'] = timezone
        time.tzset()
        return ls.lsLine('foo', stat)


    def test_oldFile(self):
        """
        A file with an mtime six months (approximately) or more in the past has
        a listing including a low-resolution timestamp.
        """
        # Go with 7 months.  That's more than 6 months.
        then = self.now - (60 * 60 * 24 * 31 * 7)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 Apr 26  1973 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 Apr 27  1973 foo')


    def test_oldSingleDigitDayOfMonth(self):
        """
        A file with a high-resolution timestamp which falls on a day of the
        month which can be represented by one decimal digit is formatted with
        one padding 0 to preserve the columns which come after it.
        """
        # A point about 7 months in the past, tweaked to fall on the first of a
        # month so we test the case we want to test.
        then = self.now - (60 * 60 * 24 * 31 * 7) + (60 * 60 * 24 * 5)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 May 01  1973 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 May 02  1973 foo')


    def test_newFile(self):
        """
        A file with an mtime fewer than six months (approximately) in the past
        has a listing including a high-resolution timestamp excluding the year.
        """
        # A point about three months in the past.
        then = self.now - (60 * 60 * 24 * 31 * 3)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 Aug 28 17:33 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 Aug 29 09:33 foo')


    def test_localeIndependent(self):
        """
        The month name in the date is locale independent.
        """
        # A point about three months in the past.
        then = self.now - (60 * 60 * 24 * 31 * 3)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        # Fake that we're in a language where August is not Aug (e.g.: Spanish)
        currentLocale = locale.getlocale()
        locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
        self.addCleanup(locale.setlocale, locale.LC_ALL, currentLocale)

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 Aug 28 17:33 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 Aug 29 09:33 foo')

    # If alternate locale is not available, the previous test will be
    # skipped, please install this locale for it to run
    currentLocale = locale.getlocale()
    try:
        try:
            locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
        except locale.Error:
            test_localeIndependent.skip = "The es_AR.UTF8 locale is not installed."
        else:
            locale.setlocale(locale.LC_ALL, currentLocale)
    except:
        pass


    def test_newSingleDigitDayOfMonth(self):
        """
        A file with a high-resolution timestamp which falls on a day of the
        month which can be represented by one decimal digit is formatted with
        one padding 0 to preserve the columns which come after it.
        """
        # A point about three months in the past, tweaked to fall on the first
        # of a month so we test the case we want to test.
        then = self.now - (60 * 60 * 24 * 31 * 3) + (60 * 60 * 24 * 4)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 Sep 01 17:33 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 Sep 02 09:33 foo')



class InMemorySSHChannel(StringTransport, object):
    """
    Minimal implementation of a L{SSHChannel} like class which only reads and
    writes data from memory.
    """

    def __init__(self, conn):
        """
        @param conn: The SSH connection associated with this channel.
        @type conn: L{SSHConnection}
        """
        self.conn = conn
        self.localClosed = 0
        super(InMemorySSHChannel, self).__init__()



class FilesystemAccessExpectations(object):
    """
    A test helper used to support expected filesystem access.
    """

    def __init__(self):
        self._cache = {}


    def put(self, path, flags, stream):
        """

        @param path: Path at which the stream is requested.
        @type path: L{str}

        @param path: Flags with which the stream is requested.
        @type path: L{str}

        @param stream: A stream.
        @type stream: C{File}
        """
        self._cache[(path, flags)] = stream


    def pop(self, path, flags):
        """
        Remove a stream from the memory.

        @param path: Path at which the stream is requested.
        @type path: L{str}

        @param path: Flags with which the stream is requested.
        @type path: L{str}

        @return: A stream.
        @rtype: C{File}
        """
        return self._cache.pop((path, flags))



class InMemorySFTPClient(object):
    """
    A L{filetransfer.FileTransferClient} which does filesystem operations in
    memory, without touching the local disc or the network interface.

    @ivar _availableFiles: File like objects which are available to the SFTP
        client.
    @type _availableFiles: L{FilesystemRegister}
    """

    def __init__(self, availableFiles):
        self.transport = InMemorySSHChannel(self)
        self.options = {
            'requests': 1,
            'buffersize': 10,
            }
        self._availableFiles = availableFiles


    def openFile(self, filename, flags, attrs):
        """
        @see: L{filetransfer.FileTransferClient.openFile}.

        Retrieve and remove cached file based on flags.
        """
        return self._availableFiles.pop(filename, flags)



@implementer(ISFTPFile)
class InMemoryRemoteFile(BytesIO):
    """
    An L{ISFTPFile} which handles all data in memory.
    """

    def __init__(self, name):
        """
        @param name: Name of this file.
        @type name: L{str}
        """
        self.name = name
        BytesIO.__init__(self)


    def writeChunk(self, start, data):
        """
        @see: L{ISFTPFile.writeChunk}
        """
        self.seek(start)
        self.write(data)
        return defer.succeed(self)


    def close(self):
        """
        @see: L{ISFTPFile.writeChunk}

        Keeps data after file was closed to help with testing.
        """
        self._closed = True


    def getvalue(self):
        """
        Get current data of file.

        Allow reading data event when file is closed.
        """
        return BytesIO.getvalue(self)



class StdioClientTests(TestCase):
    """
    Tests for L{cftp.StdioClient}.
    """
    def setUp(self):
        """
        Create a L{cftp.StdioClient} hooked up to dummy transport and a fake
        user database.
        """
        self.fakeFilesystem = FilesystemAccessExpectations()
        sftpClient = InMemorySFTPClient(self.fakeFilesystem )
        self.client = cftp.StdioClient(sftpClient)
        self.client.currentDirectory = '/'
        self.database = self.client._pwd = UserDatabase()
        # Use a fixed width for all tests so that we get the same results when
        # running these tests from different terminals.
        # Run tests in a wide console so that all items are delimited by at
        # least one space character.
        self.setKnownConsoleSize(500, 24)
        # Intentionally bypassing makeConnection - that triggers some code
        # which uses features not provided by our dumb Connection fake.
        self.client.transport = self.client.client.transport


    def test_exec(self):
        """
        The I{exec} command runs its arguments locally in a child process
        using the user's shell.
        """
        self.database.addUser(
            getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
            sys.executable)

        d = self.client._dispatchCommand("exec print(1 + 2)")
        d.addCallback(self.assertEqual, b"3\n")
        return d


    def test_execWithoutShell(self):
        """
        If the local user has no shell, the I{exec} command runs its arguments
        using I{/bin/sh}.
        """
        self.database.addUser(
            getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', '')

        d = self.client._dispatchCommand("exec echo hello")
        d.addCallback(self.assertEqual, b"hello\n")
        return d


    def test_bang(self):
        """
        The I{exec} command is run for lines which start with C{"!"}.
        """
        self.database.addUser(
            getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
            '/bin/sh')

        d = self.client._dispatchCommand("!echo hello")
        d.addCallback(self.assertEqual, b"hello\n")
        return d


    def setKnownConsoleSize(self, width, height):
        """
        For the duration of this test, patch C{cftp}'s C{fcntl} module to return
        a fixed width and height.

        @param width: the width in characters
        @type width: L{int}
        @param height: the height in characters
        @type height: L{int}
        """
        # Local import to avoid win32 issues.
        import tty
        class FakeFcntl(object):
            def ioctl(self, fd, opt, mutate):
                if opt != tty.TIOCGWINSZ:
                    self.fail("Only window-size queries supported.")
                return struct.pack("4H", height, width, 0, 0)
        self.patch(cftp, "fcntl", FakeFcntl())


    def test_printProgressBarReporting(self):
        """
        L{StdioClient._printProgressBar} prints a progress description,
        including percent done, amount transferred, transfer rate, and time
        remaining, all based the given start time, the given L{FileWrapper}'s
        progress information and the reactor's current time.
        """
        # Use a short, known console width because this simple test doesn't
        # need to test the console padding.
        self.setKnownConsoleSize(10, 34)
        clock = self.client.reactor = Clock()
        wrapped = BytesIO(b"x")
        wrapped.name = b"sample"
        wrapper = cftp.FileWrapper(wrapped)
        wrapper.size = 1024 * 10
        startTime = clock.seconds()
        clock.advance(2.0)
        wrapper.total += 4096

        self.client._printProgressBar(wrapper, startTime)

        if _PY3:
            result = b"\rb'sample' 40% 4.0kB 2.0kBps 00:03 "
        else:
            result = "\rsample 40% 4.0kB 2.0kBps 00:03 "
        self.assertEqual(self.client.transport.value(), result)


    def test_printProgressBarNoProgress(self):
        """
        L{StdioClient._printProgressBar} prints a progress description that
        indicates 0 bytes transferred if no bytes have been transferred and no
        time has passed.
        """
        self.setKnownConsoleSize(10, 34)
        clock = self.client.reactor = Clock()
        wrapped = BytesIO(b"x")
        wrapped.name = b"sample"
        wrapper = cftp.FileWrapper(wrapped)
        startTime = clock.seconds()

        self.client._printProgressBar(wrapper, startTime)

        if _PY3:
            result = b"\rb'sample'  0% 0.0B 0.0Bps 00:00 "
        else:
            result = "\rsample  0% 0.0B 0.0Bps 00:00 "
        self.assertEqual(self.client.transport.value(), result)


    def test_printProgressBarEmptyFile(self):
        """
        Print the progress for empty files.
        """
        self.setKnownConsoleSize(10, 34)
        wrapped = BytesIO()
        wrapped.name = b'empty-file'
        wrapper = cftp.FileWrapper(wrapped)

        self.client._printProgressBar(wrapper, 0)

        if _PY3:
            result = b"\rb'empty-file'100% 0.0B 0.0Bps 00:00 "
        else:
            result = "\rempty-file100% 0.0B 0.0Bps 00:00 "
        self.assertEqual(result, self.client.transport.value())


    def test_getFilenameEmpty(self):
        """
        Returns empty value for both filename and remaining data.
        """
        result = self.client._getFilename('  ')

        self.assertEqual(('', ''), result)


    def test_getFilenameOnlyLocal(self):
        """
        Returns empty value for remaining data when line contains
        only a filename.
        """
        result = self.client._getFilename('only-local')

        self.assertEqual(('only-local', ''), result)


    def test_getFilenameNotQuoted(self):
        """
        Returns filename and remaining data striped of leading and trailing
        spaces.
        """
        result = self.client._getFilename(' local  remote file  ')

        self.assertEqual(('local', 'remote file'), result)


    def test_getFilenameQuoted(self):
        """
        Returns filename and remaining data not striped of leading and trailing
        spaces when quoted paths are requested.
        """
        result = self.client._getFilename(' " local file "  " remote  file " ')

        self.assertEqual((' local file ', '" remote  file "'), result)


    def makeFile(self, path=None, content=b''):
        """
        Create a local file and return its path.

        When `path` is L{None}, it will create a new temporary file.

        @param path: Optional path for the new file.
        @type path: L{str}

        @param content: Content to be written in the new file.
        @type content: L{bytes}

        @return: Path to the newly create file.
        """
        if path is None:
            path = self.mktemp()
        with open(path, 'wb') as file:
            file.write(content)
        return path


    def checkPutMessage(self, transfers, randomOrder=False):
        """
        Check output of cftp client for a put request.


        @param transfers: List with tuple of (local, remote, progress).
        @param randomOrder: When set to C{True}, it will ignore the order
            in which put reposes are received

        """
        output = self.client.transport.value()
        if _PY3:
            output = output.decode("utf-8")
        output = output.split('\n\r')

        expectedOutput = []
        actualOutput = []

        for local, remote, expected in transfers:
            # For each transfer we have a list of reported progress which
            # ends with the final message informing that file was transferred.
            expectedTransfer = []
            for line in expected:
                expectedTransfer.append('%s %s' % (local, line))
            expectedTransfer.append('Transferred %s to %s' % (local, remote))
            expectedOutput.append(expectedTransfer)

            progressParts = output.pop(0).strip('\r').split('\r')
            actual = progressParts[:-1]

            last = progressParts[-1].strip('\n').split('\n')
            actual.extend(last)

            actualTransfer = []
            # Each transferred file is on a line with summary on the last
            # line. Summary is copying at the end.
            for line in actual[:-1]:
                # Output line is in the format
                # NAME PROGRESS_PERCENTAGE PROGRESS_BYTES SPEED ETA.
                # For testing we only care about the
                # PROGRESS_PERCENTAGE and PROGRESS values.

                # Ignore SPPED and ETA.
                line = line.strip().rsplit(' ', 2)[0]
                # NAME can be followed by a lot of spaces so we need to
                # reduce them to single space.
                line = line.strip().split(' ', 1)
                actualTransfer.append('%s %s' % (line[0], line[1].strip()))
            actualTransfer.append(actual[-1])
            actualOutput.append(actualTransfer)

        if randomOrder:
            self.assertEqual(sorted(expectedOutput), sorted(actualOutput))
        else:
            self.assertEqual(expectedOutput, actualOutput)

        self.assertEqual(
            0, len(output),
            'There are still put responses which were not checked.',
            )


    def test_cmd_PUTSingleNoRemotePath(self):
        """
        A name based on local path is used when remote path is not
        provided.

        The progress is updated while chunks are transferred.
        """
        content = b'Test\r\nContent'
        localPath = self.makeFile(content=content)
        flags = (
            filetransfer.FXF_WRITE |
            filetransfer.FXF_CREAT |
            filetransfer.FXF_TRUNC
            )
        remoteName = os.path.join('/', os.path.basename(localPath))
        remoteFile = InMemoryRemoteFile(remoteName)
        self.fakeFilesystem.put(remoteName, flags, defer.succeed(remoteFile))
        self.client.client.options['buffersize'] = 10

        deferred = self.client.cmd_PUT(localPath)
        self.successResultOf(deferred)

        self.assertEqual(content, remoteFile.getvalue())
        self.assertTrue(remoteFile._closed)
        self.checkPutMessage(
            [(localPath, remoteName,
                ['76% 10.0B', '100% 13.0B', '100% 13.0B'])])


    def test_cmd_PUTSingleRemotePath(self):
        """
        Remote path is extracted from first filename after local file.

        Any other data in the line is ignored.
        """
        localPath = self.makeFile()
        flags = (
            filetransfer.FXF_WRITE |
            filetransfer.FXF_CREAT |
            filetransfer.FXF_TRUNC
            )
        remoteName = '/remote-path'
        remoteFile = InMemoryRemoteFile(remoteName)
        self.fakeFilesystem.put(remoteName, flags, defer.succeed(remoteFile))

        deferred = self.client.cmd_PUT(
            '%s %s ignored' % (localPath, remoteName))
        self.successResultOf(deferred)

        self.checkPutMessage([(localPath, remoteName, ['100% 0.0B'])])
        self.assertTrue(remoteFile._closed)
        self.assertEqual(b'', remoteFile.getvalue())


    def test_cmd_PUTMultipleNoRemotePath(self):
        """
        When a gobbing expression is used local files are transferred with
        remote file names based on local names.
        """
        first = self.makeFile()
        firstName = os.path.basename(first)
        secondName = 'second-name'
        parent = os.path.dirname(first)
        second = self.makeFile(path=os.path.join(parent, secondName))
        flags = (
            filetransfer.FXF_WRITE |
            filetransfer.FXF_CREAT |
            filetransfer.FXF_TRUNC
            )
        firstRemotePath = '/%s' % (firstName,)
        secondRemotePath = '/%s' % (secondName,)
        firstRemoteFile = InMemoryRemoteFile(firstRemotePath)
        secondRemoteFile = InMemoryRemoteFile(secondRemotePath)
        self.fakeFilesystem.put(
            firstRemotePath, flags, defer.succeed(firstRemoteFile))
        self.fakeFilesystem.put(
            secondRemotePath, flags, defer.succeed(secondRemoteFile))

        deferred = self.client.cmd_PUT(os.path.join(parent, '*'))
        self.successResultOf(deferred)

        self.assertTrue(firstRemoteFile._closed)
        self.assertEqual(b'', firstRemoteFile.getvalue())
        self.assertTrue(secondRemoteFile._closed)
        self.assertEqual(b'', secondRemoteFile.getvalue())
        self.checkPutMessage([
            (first, firstRemotePath, ['100% 0.0B']),
            (second, secondRemotePath, ['100% 0.0B']),
            ],
            randomOrder=True,
            )


    def test_cmd_PUTMultipleWithRemotePath(self):
        """
        When a gobbing expression is used local files are transferred with
        remote file names based on local names.
        when a remote folder is requested remote paths are composed from
        remote path and local filename.
        """
        first = self.makeFile()
        firstName = os.path.basename(first)
        secondName = 'second-name'
        parent = os.path.dirname(first)
        second = self.makeFile(path=os.path.join(parent, secondName))
        flags = (
            filetransfer.FXF_WRITE |
            filetransfer.FXF_CREAT |
            filetransfer.FXF_TRUNC
            )
        firstRemoteFile = InMemoryRemoteFile(firstName)
        secondRemoteFile = InMemoryRemoteFile(secondName)
        firstRemotePath = '/remote/%s' % (firstName,)
        secondRemotePath = '/remote/%s' % (secondName,)
        self.fakeFilesystem.put(
            firstRemotePath, flags, defer.succeed(firstRemoteFile))
        self.fakeFilesystem.put(
            secondRemotePath, flags, defer.succeed(secondRemoteFile))

        deferred = self.client.cmd_PUT(
            '%s remote' % (os.path.join(parent, '*'),))
        self.successResultOf(deferred)

        self.assertTrue(firstRemoteFile._closed)
        self.assertEqual(b'', firstRemoteFile.getvalue())
        self.assertTrue(secondRemoteFile._closed)
        self.assertEqual(b'', secondRemoteFile.getvalue())
        self.checkPutMessage([
            (first, firstName, ['100% 0.0B']),
            (second, secondName, ['100% 0.0B']),
            ],
            randomOrder=True,
            )



class FileTransferTestRealm:
    def __init__(self, testDir):
        self.testDir = testDir


    def requestAvatar(self, avatarID, mind, *interfaces):
        a = FileTransferTestAvatar(self.testDir)
        return interfaces[0], a, lambda: None



class SFTPTestProcess(protocol.ProcessProtocol):
    """
    Protocol for testing cftp. Provides an interface between Python (where all
    the tests are) and the cftp client process (which does the work that is
    being tested).
    """

    def __init__(self, onOutReceived):
        """
        @param onOutReceived: A L{Deferred} to be fired as soon as data is
        received from stdout.
        """
        self.clearBuffer()
        self.onOutReceived = onOutReceived
        self.onProcessEnd = None
        self._expectingCommand = None
        self._processEnded = False


    def clearBuffer(self):
        """
        Clear any buffered data received from stdout. Should be private.
        """
        self.buffer = b''
        self._linesReceived = []
        self._lineBuffer = b''


    def outReceived(self, data):
        """
        Called by Twisted when the cftp client prints data to stdout.
        """
        log.msg('got %r' % data)
        lines = (self._lineBuffer + data).split(b'\n')
        self._lineBuffer = lines.pop(-1)
        self._linesReceived.extend(lines)
        # XXX - not strictly correct.
        # We really want onOutReceived to fire after the first 'cftp>' prompt
        # has been received. (See use in OurServerCmdLineClientTests.setUp)
        if self.onOutReceived is not None:
            d, self.onOutReceived = self.onOutReceived, None
            d.callback(data)
        self.buffer += data
        self._checkForCommand()


    def _checkForCommand(self):
        prompt = b'cftp> '
        if self._expectingCommand and self._lineBuffer == prompt:
            buf = b'\n'.join(self._linesReceived)
            if buf.startswith(prompt):
                buf = buf[len(prompt):]
            self.clearBuffer()
            d, self._expectingCommand = self._expectingCommand, None
            d.callback(buf)


    def errReceived(self, data):
        """
        Called by Twisted when the cftp client prints data to stderr.
        """
        log.msg('err: %s' % data)


    def getBuffer(self):
        """
        Return the contents of the buffer of data received from stdout.
        """
        return self.buffer


    def runCommand(self, command):
        """
        Issue the given command via the cftp client. Return a C{Deferred} that
        fires when the server returns a result. Note that the C{Deferred} will
        callback even if the server returns some kind of error.

        @param command: A string containing an sftp command.

        @return: A C{Deferred} that fires when the sftp server returns a
        result. The payload is the server's response string.
        """
        self._expectingCommand = defer.Deferred()
        self.clearBuffer()
        if isinstance(command, unicode):
            command = command.encode("utf-8")
        self.transport.write(command + b'\n')
        return self._expectingCommand


    def runScript(self, commands):
        """
        Run each command in sequence and return a Deferred that fires when all
        commands are completed.

        @param commands: A list of strings containing sftp commands.

        @return: A C{Deferred} that fires when all commands are completed. The
        payload is a list of response strings from the server, in the same
        order as the commands.
        """
        sem = defer.DeferredSemaphore(1)
        dl = [sem.run(self.runCommand, command) for command in commands]
        return defer.gatherResults(dl)


    def killProcess(self):
        """
        Kill the process if it is still running.

        If the process is still running, sends a KILL signal to the transport
        and returns a C{Deferred} which fires when L{processEnded} is called.

        @return: a C{Deferred}.
        """
        if self._processEnded:
            return defer.succeed(None)
        self.onProcessEnd = defer.Deferred()
        self.transport.signalProcess('KILL')
        return self.onProcessEnd


    def processEnded(self, reason):
        """
        Called by Twisted when the cftp client process ends.
        """
        self._processEnded = True
        if self.onProcessEnd:
            d, self.onProcessEnd = self.onProcessEnd, None
            d.callback(None)



class CFTPClientTestBase(SFTPTestBase):
    def setUp(self):
        with open('dsa_test.pub', 'wb') as f:
            f.write(test_ssh.publicDSA_openssh)
        with open('dsa_test', 'wb') as f:
            f.write(test_ssh.privateDSA_openssh)
        os.chmod('dsa_test', 33152)
        with open('kh_test', 'wb') as f:
            f.write(b'127.0.0.1 ' + test_ssh.publicRSA_openssh)
        return SFTPTestBase.setUp(self)


    def startServer(self):
        realm = FileTransferTestRealm(self.testDir)
        p = portal.Portal(realm)
        p.registerChecker(test_ssh.conchTestPublicKeyChecker())
        fac = test_ssh.ConchTestServerFactory()
        fac.portal = p
        self.server = reactor.listenTCP(0, fac, interface="127.0.0.1")


    def stopServer(self):
        if not hasattr(self.server.factory, 'proto'):
            return self._cbStopServer(None)
        self.server.factory.proto.expectedLoseConnection = 1
        d = defer.maybeDeferred(
            self.server.factory.proto.transport.loseConnection)
        d.addCallback(self._cbStopServer)
        return d


    def _cbStopServer(self, ignored):
        return defer.maybeDeferred(self.server.stopListening)


    def tearDown(self):
        for f in ['dsa_test.pub', 'dsa_test', 'kh_test']:
            try:
                os.remove(f)
            except:
                pass
        return SFTPTestBase.tearDown(self)



class OurServerCmdLineClientTests(CFTPClientTestBase):
    """
    Functional tests which launch a SFTP server over TCP on localhost and check
    cftp command line interface using a spawned process.

    Due to the spawned process you can not add a debugger breakpoint for the
    client code.
    """

    def setUp(self):
        CFTPClientTestBase.setUp(self)

        self.startServer()
        cmds = ('-p %i -l testuser '
               '--known-hosts kh_test '
               '--user-authentications publickey '
               '--host-key-algorithms ssh-rsa '
               '-i dsa_test '
               '-a '
               '-v '
               '127.0.0.1')
        port = self.server.getHost().port
        cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
        log.msg('running %s %s' % (sys.executable, cmds))
        d = defer.Deferred()
        self.processProtocol = SFTPTestProcess(d)
        d.addCallback(lambda _: self.processProtocol.clearBuffer())
        env = os.environ.copy()
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
        encodedCmds = []
        encodedEnv = {}
        for cmd in cmds:
            if isinstance(cmd, unicode):
                cmd = cmd.encode("utf-8")
            encodedCmds.append(cmd)
        for var in env:
            val = env[var]
            if isinstance(var, unicode):
                var = var.encode("utf-8")
            if isinstance(val, unicode):
                val = val.encode("utf-8")
            encodedEnv[var] = val
        log.msg(encodedCmds)
        log.msg(encodedEnv)
        reactor.spawnProcess(self.processProtocol, sys.executable, encodedCmds,
                             env=encodedEnv)
        return d


    def tearDown(self):
        d = self.stopServer()
        d.addCallback(lambda _: self.processProtocol.killProcess())
        return d


    def _killProcess(self, ignored):
        try:
            self.processProtocol.transport.signalProcess('KILL')
        except error.ProcessExitedAlready:
            pass


    def runCommand(self, command):
        """
        Run the given command with the cftp client. Return a C{Deferred} that
        fires when the command is complete. Payload is the server's output for
        that command.
        """
        return self.processProtocol.runCommand(command)


    def runScript(self, *commands):
        """
        Run the given commands with the cftp client. Returns a C{Deferred}
        that fires when the commands are all complete. The C{Deferred}'s
        payload is a list of output for each command.
        """
        return self.processProtocol.runScript(commands)


    def testCdPwd(self):
        """
        Test that 'pwd' reports the current remote directory, that 'lpwd'
        reports the current local directory, and that changing to a
        subdirectory then changing to its parent leaves you in the original
        remote directory.
        """
        # XXX - not actually a unit test, see docstring.
        homeDir = self.testDir
        d = self.runScript('pwd', 'lpwd', 'cd testDirectory', 'cd ..', 'pwd')

        def cmdOutput(output):
            """
            Callback function for handling command output.
            """
            cmds = []
            for cmd in output:
                if _PY3 and isinstance(cmd, bytes):
                    cmd = cmd.decode("utf-8")
                cmds.append(cmd)
            return cmds[:3] + cmds[4:]

        d.addCallback(cmdOutput)
        d.addCallback(self.assertEqual,
                      [homeDir.path, os.getcwd(), '', homeDir.path])
        return d


    def testChAttrs(self):
        """
        Check that 'ls -l' output includes the access permissions and that
        this output changes appropriately with 'chmod'.
        """
        def _check(results):
            self.flushLoggedErrors()
            self.assertTrue(results[0].startswith(b'-rw-r--r--'))
            self.assertEqual(results[1], b'')
            self.assertTrue(results[2].startswith(b'----------'), results[2])
            self.assertEqual(results[3], b'')

        d = self.runScript('ls -l testfile1', 'chmod 0 testfile1',
                           'ls -l testfile1', 'chmod 644 testfile1')
        return d.addCallback(_check)
        # XXX test chgrp/own


    def testList(self):
        """
        Check 'ls' works as expected. Checks for wildcards, hidden files,
        listing directories and listing empty directories.
        """
        def _check(results):
            self.assertEqual(results[0], [b'testDirectory', b'testRemoveFile',
                                          b'testRenameFile', b'testfile1'])
            self.assertEqual(results[1], [b'testDirectory', b'testRemoveFile',
                                          b'testRenameFile', b'testfile1'])
            self.assertEqual(results[2], [b'testRemoveFile', b'testRenameFile'])
            self.assertEqual(results[3], [b'.testHiddenFile', b'testRemoveFile',
                                          b'testRenameFile'])
            self.assertEqual(results[4], [b''])
        d = self.runScript('ls', 'ls ../' + self.testDir.basename(),
                           'ls *File', 'ls -a *File', 'ls -l testDirectory')
        d.addCallback(lambda xs: [x.split(b'\n') for x in xs])
        return d.addCallback(_check)


    def testHelp(self):
        """
        Check that running the '?' command returns help.
        """
        d = self.runCommand('?')

        helpText = cftp.StdioClient(None).cmd_HELP('').strip()
        if isinstance(helpText, unicode):
            helpText = helpText.encode("utf-8")
        d.addCallback(self.assertEqual, helpText)
        return d


    def assertFilesEqual(self, name1, name2, msg=None):
        """
        Assert that the files at C{name1} and C{name2} contain exactly the
        same data.
        """
        self.assertEqual(name1.getContent(), name2.getContent(), msg)


    def testGet(self):
        """
        Test that 'get' saves the remote file to the correct local location,
        that the output of 'get' is correct and that 'rm' actually removes
        the file.
        """
        # XXX - not actually a unit test
        expectedOutput = ("Transferred %s/testfile1 to %s/test file2"
                          % (self.testDir.path, self.testDir.path))
        if isinstance(expectedOutput, unicode):
            expectedOutput = expectedOutput.encode("utf-8")
        def _checkGet(result):
            self.assertTrue(result.endswith(expectedOutput))
            self.assertFilesEqual(self.testDir.child('testfile1'),
                                  self.testDir.child('test file2'),
                                  "get failed")
            return self.runCommand('rm "test file2"')

        d = self.runCommand('get testfile1 "%s/test file2"' % (self.testDir.path,))
        d.addCallback(_checkGet)
        d.addCallback(lambda _: self.assertFalse(
            self.testDir.child('test file2').exists()))
        return d


    def testWildcardGet(self):
        """
        Test that 'get' works correctly when given wildcard parameters.
        """
        def _check(ignored):
            self.assertFilesEqual(self.testDir.child('testRemoveFile'),
                                  FilePath('testRemoveFile'),
                                  'testRemoveFile get failed')
            self.assertFilesEqual(self.testDir.child('testRenameFile'),
                                  FilePath('testRenameFile'),
                                  'testRenameFile get failed')

        d = self.runCommand('get testR*')
        return d.addCallback(_check)


    def testPut(self):
        """
        Check that 'put' uploads files correctly and that they can be
        successfully removed. Also check the output of the put command.
        """
        # XXX - not actually a unit test
        expectedOutput = (b'Transferred ' + self.testDir.asBytesMode().path +
                          b'/testfile1 to ' + self.testDir.asBytesMode().path +
                          b'/test"file2')
        def _checkPut(result):
            self.assertFilesEqual(self.testDir.child('testfile1'),
                                  self.testDir.child('test"file2'))
            self.assertTrue(result.endswith(expectedOutput))
            return self.runCommand('rm "test\\"file2"')

        d = self.runCommand('put %s/testfile1 "test\\"file2"'
                            % (self.testDir.path,))
        d.addCallback(_checkPut)
        d.addCallback(lambda _: self.assertFalse(
            self.testDir.child('test"file2').exists()))
        return d


    def test_putOverLongerFile(self):
        """
        Check that 'put' uploads files correctly when overwriting a longer
        file.
        """
        # XXX - not actually a unit test
        with self.testDir.child('shorterFile').open(mode='w') as f:
            f.write(b"a")
        with self.testDir.child('longerFile').open(mode='w') as f:
            f.write(b"bb")
        def _checkPut(result):
            self.assertFilesEqual(self.testDir.child('shorterFile'),
                                  self.testDir.child('longerFile'))

        d = self.runCommand('put %s/shorterFile longerFile'
                            % (self.testDir.path,))
        d.addCallback(_checkPut)
        return d


    def test_putMultipleOverLongerFile(self):
        """
        Check that 'put' uploads files correctly when overwriting a longer
        file and you use a wildcard to specify the files to upload.
        """
        # XXX - not actually a unit test
        someDir = self.testDir.child('dir')
        someDir.createDirectory()
        with someDir.child('file').open(mode='w') as f:
            f.write(b"a")
        with self.testDir.child('file').open(mode='w') as f:
            f.write(b"bb")
        def _checkPut(result):
            self.assertFilesEqual(someDir.child('file'),
                                  self.testDir.child('file'))

        d = self.runCommand('put %s/dir/*'
                            % (self.testDir.path,))
        d.addCallback(_checkPut)
        return d


    def testWildcardPut(self):
        """
        What happens if you issue a 'put' command and include a wildcard (i.e.
        '*') in parameter? Check that all files matching the wildcard are
        uploaded to the correct directory.
        """
        def check(results):
            self.assertEqual(results[0], b'')
            self.assertEqual(results[2], b'')

            self.assertFilesEqual(self.testDir.child('testRemoveFile'),
                                  self.testDir.parent().child('testRemoveFile'),
                                  'testRemoveFile get failed')
            self.assertFilesEqual(self.testDir.child('testRenameFile'),
                                  self.testDir.parent().child('testRenameFile'),
                                  'testRenameFile get failed')

        d = self.runScript('cd ..',
                           'put %s/testR*' % (self.testDir.path,),
                           'cd %s' % self.testDir.basename())
        d.addCallback(check)
        return d


    def testLink(self):
        """
        Test that 'ln' creates a file which appears as a link in the output of
        'ls'. Check that removing the new file succeeds without output.
        """
        def _check(results):
            self.flushLoggedErrors()
            self.assertEqual(results[0], b'')
            self.assertTrue(results[1].startswith(b'l'), 'link failed')
            return self.runCommand('rm testLink')

        d = self.runScript('ln testLink testfile1', 'ls -l testLink')
        d.addCallback(_check)
        d.addCallback(self.assertEqual, b'')
        return d


    def testRemoteDirectory(self):
        """
        Test that we can create and remove directories with the cftp client.
        """
        def _check(results):
            self.assertEqual(results[0], b'')
            self.assertTrue(results[1].startswith(b'd'))
            return self.runCommand('rmdir testMakeDirectory')

        d = self.runScript('mkdir testMakeDirectory',
                           'ls -l testMakeDirector?')
        d.addCallback(_check)
        d.addCallback(self.assertEqual, b'')
        return d


    def test_existingRemoteDirectory(self):
        """
        Test that a C{mkdir} on an existing directory fails with the
        appropriate error, and doesn't log an useless error server side.
        """
        def _check(results):
            self.assertEqual(results[0], b'')
            self.assertEqual(results[1],
                              b'remote error 11: mkdir failed')

        d = self.runScript('mkdir testMakeDirectory',
                           'mkdir testMakeDirectory')
        d.addCallback(_check)
        return d


    def testLocalDirectory(self):
        """
        Test that we can create a directory locally and remove it with the
        cftp client. This test works because the 'remote' server is running
        out of a local directory.
        """
        d = self.runCommand('lmkdir %s/testLocalDirectory' % (self.testDir.path,))
        d.addCallback(self.assertEqual, b'')
        d.addCallback(lambda _: self.runCommand('rmdir testLocalDirectory'))
        d.addCallback(self.assertEqual, b'')
        return d


    def testRename(self):
        """
        Test that we can rename a file.
        """
        def _check(results):
            self.assertEqual(results[0], b'')
            self.assertEqual(results[1], b'testfile2')
            return self.runCommand('rename testfile2 testfile1')

        d = self.runScript('rename testfile1 testfile2', 'ls testfile?')
        d.addCallback(_check)
        d.addCallback(self.assertEqual, b'')
        return d



class OurServerBatchFileTests(CFTPClientTestBase):
    """
    Functional tests which launch a SFTP server over localhost and checks csftp
    in batch interface.
    """

    def setUp(self):
        CFTPClientTestBase.setUp(self)
        self.startServer()


    def tearDown(self):
        CFTPClientTestBase.tearDown(self)
        return self.stopServer()


    def _getBatchOutput(self, f):
        fn = self.mktemp()
        with open(fn, 'w') as fp:
            fp.write(f)
        port = self.server.getHost().port
        cmds = ('-p %i -l testuser '
                    '--known-hosts kh_test '
                    '--user-authentications publickey '
                    '--host-key-algorithms ssh-rsa '
                    '-i dsa_test '
                    '-a '
                    '-v -b %s 127.0.0.1') % (port, fn)
        cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
        log.msg('running %s %s' % (sys.executable, cmds))
        env = os.environ.copy()
        env['PYTHONPATH'] = os.pathsep.join(sys.path)

        self.server.factory.expectedLoseConnection = 1

        d = getProcessOutputAndValue(sys.executable, cmds, env=env)

        def _cleanup(res):
            os.remove(fn)
            return res

        d.addCallback(lambda res: res[0])
        d.addBoth(_cleanup)

        return d


    def testBatchFile(self):
        """
        Test whether batch file function of cftp ('cftp -b batchfile').
        This works by treating the file as a list of commands to be run.
        """
        cmds = """pwd
ls
exit
"""

        def _cbCheckResult(res):
            res = res.split(b'\n')
            log.msg('RES %s' % repr(res))
            self.assertIn(self.testDir.asBytesMode().path, res[1])
            self.assertEqual(res[3:-2], [b'testDirectory', b'testRemoveFile',
                                             b'testRenameFile', b'testfile1'])

        d = self._getBatchOutput(cmds)
        d.addCallback(_cbCheckResult)
        return d


    def testError(self):
        """
        Test that an error in the batch file stops running the batch.
        """
        cmds = """chown 0 missingFile
pwd
exit
"""

        def _cbCheckResult(res):
            self.assertNotIn(self.testDir.asBytesMode().path, res)

        d = self._getBatchOutput(cmds)
        d.addCallback(_cbCheckResult)
        return d


    def testIgnoredError(self):
        """
        Test that a minus sign '-' at the front of a line ignores
        any errors.
        """
        cmds = """-chown 0 missingFile
pwd
exit
"""
        def _cbCheckResult(res):
            self.assertIn(self.testDir.asBytesMode().path, res)

        d = self._getBatchOutput(cmds)
        d.addCallback(_cbCheckResult)
        return d



class OurServerSftpClientTests(CFTPClientTestBase):
    """
    Test the sftp server against sftp command line client.
    """

    def setUp(self):
        CFTPClientTestBase.setUp(self)
        return self.startServer()


    def tearDown(self):
        return self.stopServer()


    def test_extendedAttributes(self):
        """
        Test the return of extended attributes by the server: the sftp client
        should ignore them, but still be able to parse the response correctly.

        This test is mainly here to check that
        L{filetransfer.FILEXFER_ATTR_EXTENDED} has the correct value.
        """
        fn = self.mktemp()
        with open(fn, 'w') as f:
            f.write("ls .\nexit")
        port = self.server.getHost().port

        oldGetAttr = FileTransferForTestAvatar._getAttrs
        def _getAttrs(self, s):
            attrs = oldGetAttr(self, s)
            attrs["ext_foo"] = "bar"
            return attrs

        self.patch(FileTransferForTestAvatar, "_getAttrs", _getAttrs)
        self.server.factory.expectedLoseConnection = True

        # PubkeyAcceptedKeyTypes does not exist prior to OpenSSH 7.0 so we
        # first need to check if we can set it. If we can, -V will just print
        # the version without doing anything else; if we can't, we will get a
        # configuration error.
        d = getProcessValue(
            'ssh', ('-o', 'PubkeyAcceptedKeyTypes=ssh-dss', '-V'))
        def hasPAKT(status):
            if status == 0:
                args = ('-o', 'PubkeyAcceptedKeyTypes=ssh-dss')
            else:
                args = ()
            # Pass -F /dev/null to avoid the user's configuration file from
            # being loaded, as it may contain settings that cause our tests to
            # fail or hang.
            args += ('-F', '/dev/null',
                     '-o', 'IdentityFile=dsa_test',
                     '-o', 'UserKnownHostsFile=kh_test',
                     '-o', 'HostKeyAlgorithms=ssh-rsa',
                     '-o', 'Port=%i' % (port,), '-b', fn, 'testuser@127.0.0.1')
            return args

        def check(result):
            self.assertEqual(result[2], 0)
            for i in [b'testDirectory', b'testRemoveFile',
                      b'testRenameFile', b'testfile1']:
                self.assertIn(i, result[0])
        d.addCallback(hasPAKT)
        d.addCallback(lambda args: getProcessOutputAndValue('sftp', args))
        return d.addCallback(check)



if None in (unix, cryptography, pyasn1,
            interfaces.IReactorProcess(reactor, None)):
    if _reason is None:
        _reason = "don't run w/o spawnProcess or cryptography or pyasn1"
    OurServerCmdLineClientTests.skip = _reason
    OurServerBatchFileTests.skip = _reason
    OurServerSftpClientTests.skip = _reason
    StdioClientTests.skip = _reason
    SSHSessionTests.skip = _reason
else:
    from twisted.python.procutils import which
    if not which('sftp'):
        OurServerSftpClientTests.skip = "no sftp command-line client available"

Youez - 2016 - github.com/yon3zu
LinuXploit