RIFF¤ WEBPVP8 ˜ ðÑ *ôô>‘HŸK¥¤"§£±¨àð ....................................../////.===Shadow-Here===./////................................................ > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < ------------------------------------------------------------------------------------------------------------------- /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// RIFF¤ WEBPVP8 ˜ ðÑ *ôô>‘HŸK¥¤"§£±¨àð enü¹%½_F‘åè¿2ºQú³íªú`N¿­3ÿƒügµJžaÿ¯ÿ°~¼ÎùnúîÞÖô•òíôÁÉß®Sm¥Ü/ ‡ó˜f£Ùà<˜„xëJ¢Ù€SO3x<ªÔ©4¿+ç¶A`q@Ì“Úñè™ÍÿJÌ´ª-˜ÆtÊÛL]Ïq*‘Ý”ì#ŸÌÏãY]@ê`¿ /ªfkØB4·®£ó z—Üw¥Pxù–ÞLШKÇN¾AkÙTf½è'‰g gÆv›Øuh~ a˜Z— ïj*á¥t d£“uÒ ¨`K˜¹ßþ]b>˜]_ÏÔ6W—è2r4x•íÖ…"ƒÖNîä!¦å Ú}ýxGøÌ —@ ;ÆÚŠ=ɾ1ý8lªË¥ô ^yf®Œ¢u&2©nÙÇ›ñÂñŒ³ aPo['½»øFùà­+4ê“$!lövlüÞ=;N®3ð‚õ›DÉKòÞ>ÄÍ ¥ˆuߤ#ˆ$6ù™¥îЇy’ÍB¼ çxÛ;X"WL£R÷͝*ó-¶Zu}º.s¸sšXqù–DþÿvªhüïwyŸ ¯é³lÀ:KCûÄ£Ëá\…­ ~—ýóî ¼ûûÜTÓüÇy…ŽÆvc»¾×U ñ¸žþоP÷¦ó:Ò¨¨5;Ð#&#ÖúñläÿÁœ GxÉ­/ñ‡áQðìYÉtÒw޼GÔ´zàÒò ð*ëzƒ•4~H]Ø‹f ñÓÈñ`NåWçs'ÆÏW^ø¹!XžµmQ5ÃËoLœÎ: ÞËÍ¥J ù…î èo£ßPÎñ¶ž8.Œ]ʵ~5›ÙË-ù*8ÙÖß±~ ©¹rÓê‚j¶d¸{^Q'˜±Crß ÚH—#¥¥QlÀ×ëã‡DÜ«èî þ&Çæžî;ŽÏºò6ÒLÃXy&ZŒ'j‚¢Ù€IßÚù+–MGi‰*jE€‘JcÜ ÓÌ EÏÚj]o˜ Þr <¾U ûŪæÍ/šÝH¥˜b”¼ ÁñßX GP›ï2›4WŠÏà×£…íÓk†¦H·ÅíMh–*nó÷à]ÁjCº€b7<ب‹¨5車bp2:Á[UªM„QŒçiNMa#<5›áËó¸HýÊ"…×Éw¹¦ì2º–x<›»a±¸3Weü®FÝ⑱ö–î–³|LPÈ~çð~Çå‡|º kD¢µÏàÆAI %1À% ¹Ò – ”ϝS¦‰4&¶£°à Öý”û_Ò Áw°A«Å€?mÇÛgHÉ/8)á¾ÛìáöŽP í¨PŸNÙµº¦‡§Ùš"ÿ«>+ªÕ`Ê÷‡‚ß Õû˜þãÇ-PÍ.¾XV‘€ dÜ"þ4¹ ±Oú‘©t¥¦FªÄÃÄ•b‚znýu½—#cDs˜ÃiÑOˆñ×QO=*IAÊ,¶ŽZƒ;‡wøXè%EÐk:F±Ú” .Ѽ+Áu&Ç`."pÈÉw o&¿dE6‘’EqTuK@Ì¥ã™À(Êk(h‰,H}RÀIXÛš3µ1©_OqÚÒJAñ$ÊÙÜ;D3çŒ[þùœh¬Ã³™ö6ç†NY".Ú‰ï[ªŸŒ '²Ð öø_¨ÂÉ9ué¶³ÒŠõTàîMØ#û¯gN‡bÙ놚X„ö …ÉeüÌ^J ‹€.œ$Æ)βÄeæW#óüßĺŸ€ ÀzwV 9oä»f4V*uB «Ë†¹ì¯žR霓æHXa=&“I4K;¯ç‹h×·"UŠ~<•╪Vêª&ÍSÃÆÅ?ÔqÎ*mTM ˜›µwêd#[C¡©§‘D<©àb†–ÁœøvH/,í:¯( ²£|4-„Æövv„Yͼ™^Á$ˆ„¢Û[6yB.åH*V¨æ?$=˜Ñ€•ñ·­(VlŸ‘ nÀt8W÷´Bûba?q9ú¶Xƒl«ÿ\ù¶’þòUÐj/õ¢Ìµ³g$ƒÎR!¸»|Oߍë’BhîÚÑ¢ñåŒJ„®„£2Ð3•ô02Nt…!£Í]Ïc½Qÿ?ˆ<&ÃA¾Ú,JˆijÌ#5yz„‰Î|ÊŽ5QÏ:‹ÐaóVÔxW—CpeÏzÐïíçôÿÅ_[hãsÐ_/ŽTÝ?BîˆííV$<¿i>²F¬_Eß¿ †bÊŒº­ÿ®Z H“C}”¬,Mp ý/Bá£w>˜YV°aƒúh+cŠ- r/[%|üUMHäQ°X»|û/@|°¥Ð !BÔ Ç¢Ä©š+Õì D«7ìN¶ŽðÔ " ƶ’ÖçtA‰Û×}{tþz­¾GÍ›k¹OEJR$ Â׃ «ëÁ"oÉôž$oUK(Ä)Ãz³Ê-‹êN[Ò3Œñbï8P 4ƒ×q¢bo|?<ÛX¬òÄͰL–±›(™ûG?ýË©ÚÄ–ÂDØÐ_Ç¡ô ¾–ÄÏø ×e8Ë©$ÄF¹Å‹ì[©óìl:F¾f´‹‹Xì²ï®\¬ôùƒ ÿat¥óèÒùHß0äe‚;ü×h:ÆWðHž=Ã8骣"kœ'Y?³}Tûè€>?0l›e1Lòñ„aæKÆw…hÖŠùW…ÈÆÄ0ši·›[pcwËþñiêíY/~-Á5˜!¿†A›™Mÿþ(±“t@â“ö2­´TG5yé]çå僳 .·ÍïçÝ7UÚ±Ð/Nè»,_Ï ùdj7\ï Wì4›„»c¸àešg#ÒÊ⥭áØo5‘?ÌdÝô¯ ¹kzsƒ=´#ëÉK›Ø´±-¥eW?‡çßtòTã…$Ý+qÿ±ƒ÷_3Ô¥í÷:æ–ž<·Ö‡‰Å¢ š‡%Ô—utÌÈìðžgÖÀz²À—ï÷Óîäõ{K'´È÷³yaÏÁjƒô}ž§®æÊydÕÈë5¯èˆõvÕ©ã*çD„ “z„Ó‡^^xÂ3M§A´JG‚öï 3W'ˆ.OvXè¡ÊÕª?5º7†˜(˜Ç¶#çê’¶!ÌdZK§æ 0fãaN]òY³RV ™î$®K2R¨`W!1Ôó\;Ý ýB%qæK•&ÓÈe9È0êI±žeŸß -ú@žQr¦ ö4»M¼Áè¹µmw 9 EÆE_°2ó„ŸXKWÁ×Hóì^´²GѝF©óäR†¦‰ç"V»eØ<3ùd3ÿÚ¤Žú“Gi" —‘_ÙËÎ~Üö¯¥½Î»üŸEÚŽåmÞþí ;ÞólËΦMzA"Âf(´òá;Éï(/7½ûñÌ­cïÕçлþÝz¾-ÍvÑ“pH­–ðÓj$¸Äû¤‚‘ãUBË-n“2åPkS5&‹Â|+g^œ®Ì͆d!OïäîU«c;{Û!ÅŽ«ëZ9Ókóˆ]¯ƒ›né `ÇÒ+tÆš (ØKá¾—=3œ®•vuMñg²\ï Ec€ 05±d™‡×iÇ×›UúvÌ¢£Èþ¡ÕØô¶ßÎA"ß±#Ö²ˆÊŸ¦*Ä~ij|àø.-¼'»Ú¥£h ofº¦‡VsR=N½„Î v˜Z*SÌ{=jÑB‹tê…;’HžH¯8–îDù8ñ¢|Q•bÛçš–‹m³“ê¨ åÏ^m¬Žãþ©ïêO‡½6] µÆ„Ooòü ²x}N¦Ë3ïé¿»€›HA˜m%çÞ/¿í7Fø“‹léUk)É°Œµ8Q8›:ÀŠeT*šõ~ôڝG6 ¢}`ùH­–”¡k ‰P1>š†®9z11!X wKfmÁ¦xÑ,N1Q”–æB¶M…ÒÃv6SMˆhU¬ÊPŽï‘öj=·CŒ¯u¹ƒVIЃsx4’ömÛýcå¡¶7ßŠß 57^\wÒÐÆ k§h,Œý î«q^R½3]J¸ÇðN ‚çU¬ôº^Áì} ³f©Õœ§ˆã:FÄÈ‚é(€™?àýÓüè1Gô£¼éj‚OÅñ  #>×—ßtà 0G¥Åa뀐kßhc™À_ÉñÞ#±)GD" YîäË-ÿÙ̪ ¹™a¯´¢E\ÝÒö‚;™„ë]_ p8‰o¡ñ+^÷ 3‘'dT4œŽ ðVë½° :¬víÑ«£tßÚS-3¶“þ2 †üüʨòrš¹M{É_¤`Û¨0ìjœøJ‡:÷ÃáZ˜†@GP&œÑDGÏs¡þ¦þDGú‘1Yá9Ôþ¼ ûø…§÷8&–ÜÑnÄ_m®^üÆ`;ÉVÁJ£?â€-ßê}suÍ2sõA NÌúA磸‘îÿÚ»ƒìö·á¿±tÑÐ"Tÿü˜[@/äj¬€uüªìù¥Ý˜á8Ý´sõj 8@rˆð äþZÇD®ÿUÏ2ùôõrBzÆÏÞž>Ì™xœ“ wiÎ×7_… ¸ \#€MɁV¶¥üÕÿPÔ9Z‡ø§É8#H:ƒ5ÀÝå9ÍIŒ5åKÙŠ÷qÄ>1AÈøžj"µÂд/ªnÀ qªã}"iŸBå˜ÓÛŽ¦…&ݧ;G@—³b¯“•"´4í¨ôM¨åñC‹ïùÉó¯ÓsSH2Ý@ßáM‡ˆKÀªÛUeø/4\gnm¥‹ŸŒ qÄ b9ÞwÒNÏ_4Ég³ú=܆‚´ •â¥õeíþkjz>éÚyU«Íӝ݃6"8/ø{=Ô¢»G¥ äUw°W«,ô—¿ãㆅү¢³xŠUû™yŒ (øSópÐ 9\åTâ»—*oG$/×ÍT†Y¿1¤Þ¢_‡ ¼ „±ÍçèSaÓ 3ÛMÁBkxs‰’R/¡¤ˆÙçª(*õ„üXÌ´ƒ E§´¬EF"Ù”R/ÐNyÆÂ^°?™6¡œïJ·±$§?º>ÖüœcNÌù¯G ‹ñ2ЁBB„^·úìaz¨k:#¨Æ¨8LÎõލ£^§S&cŒÐU€ü(‡F±Š¼&P>8ÙÁ ‰ p5?0ÊÆƒZl¸aô š¼¡}gÿ¶zÆC²¹¬ÎÖG*HB¡O<º2#ñŒAƒ–¡B˜´É$¥›É:FÀÔx¾u?XÜÏÓvN©RS{2ʈãk9rmP¼Qq̳ è¼ÐFׄ^¡Öì fE“F4A…!ì/…¦Lƒ… … $%´¾yã@CI¬ á—3PþBÏNÿ<ý°4Ü ËÃ#ØÍ~âW«rEñw‹eùMMHß²`¬Öó½íf³:‹k˜¯÷}Z!ã¿<¥,\#öµÀ¯aÒNÆIé,Ћ–lŽ#Àæ9ÀÒS·I’½-Ïp Äz¤Š Â* ­íÄ9­< h>׍3ZkËU¹§˜ŒŠ±f­’¤º³Q ÏB?‹#µíÃ¥®@(Gs«†vI¥Mµ‹Á©e~2ú³ÁP4ìÕi‚²Ê^ö@-DþÓàlÜOÍ]n"µã:žpsŽ¢:! Aõ.ç~ÓBûH÷JCÌ]õVƒd «ú´QÙEA–¯¯Œ!.ˆˆëQ±ù œ·Ì!Õâ )ùL„ÅÀlÚè5@B…o´Æ¸XÓ&Û…O«˜”_#‡ƒ„ûÈt!¤ÁÏ›ÎÝŠ?c9 â\>lÓÁVÄÑ™£eØY]:fÝ–—ù+p{™ðè û³”g±OƒÚSù£áÁÊ„ä,ï7š²G ÕÌBk)~ÑiCµ|h#u¤¶îK¨² #²vݯGãeÖ϶ú…¾múÀ¶þÔñ‚Š9'^($¤§ò “š½{éúp÷J›ušS¹áªCÂubÃH9™D™/ZöØÁ‡¦ÝÙŸ·kð*_”.C‹{áXó€‡c¡c€§/šò/&éš÷,àéJþ‰X›fµ“C¨œ®r¬"kL‰Â_q…Z–.ÉL~O µ›zn‚¹À¦Öª7\àHµšÖ %»ÇníV[¥*Õ;ƒ#½¾HK-ÖIÊdÏEÚ#=o÷Óò³´Š: Ç?{¾+9›–‘OEáU·S€˜j"ÄaÜ ŒÛWt› á–c#a»pÔZÞdŽtWê=9éöÊ¢µ~ ë ;Öe‡Œ®:bî3±ýê¢wà¼îpêñ¹¾4 zc¾ðÖÿzdêŒÑÒŝÀ‰s6¤í³ÎÙB¿OZ”+F¤á‡3@Ñëäg©·Ž ˆèª<ù@É{&S„œÕúÀA)‰h:YÀ5^ÂÓŒ°õäU\ ùËÍû#²?Xe¬tu‰^zÒÔãë¼ÛWtEtû …‚g¶Úüâî*moGè¨7%u!]PhÏd™Ý%Îx: VÒ¦ôÊD3ÀŽKÛËãvÆî…N¯ä>Eró–ð`5 Œ%u5XkñÌ*NU%¶áœÊ:Qÿú»“úzyÏ6å-၇¾ ´ ÒÊ]y žO‘w2Äøæ…H’²f±ÎÇ.ª|¥'gîV•Ü .̘¯€šòü¤U~Ù†*¢!?ò wý,}´°ÔÞnïoKq5µb!áÓ3"vAßH¡³¡·G(ÐÎ0Îò¼MG!/ài®@—¬04*`…«é8ªøøló“ˆÊ”èù¤…ßÊoÿé'ËuÌÖ5×È¡§ˆˆfŽë9}hìâ_!!¯  B&Ëö¶‰ÀAÙNVŸ Wh›¸®XÑJì¨ú“¿÷3uj²˜¨ÍÎìë±aúŠÝå¯ð*Ó¨ôJ“yºØ)m°WýOè68†ŸÏ2—‰Ïüꪫٚ¥‹l1 ø ÏÄFjêµvÌbü¦èÝx:X±¢H=MÐß—,ˆÉÇ´(9ú¾^ÅÚ4¿m‡$âX‘å%(AlZo@½¨UOÌÕ”1ø¸jÎÀÃÃ_ µ‘Ü.œº¦Ut: Æï’!=¯uwû#,“pþÇúŒø(é@?³ü¥‘Mo §—s@Œ#)§ŒùkL}NOÆêA›¸~r½¼ÙA—HJ«eˆÖ´*¡ÓpÌŸö.m<-"³ûÈ$¬_6­åf£ïÚâj1y§ÕJ½@dÞÁr&Í\Z%D£Íñ·AZ Û³øüd/ªAi†/Й~  ‡âĮҮÏh§°b—›Û«mJžòG'[ÈYýŒ¦9psl ýÁ ®±f¦x,‰½tN ‚Xª9 ÙÖH.«Lo0×?͹m¡å†Ѽ+›2ƒF ±Ê8 7Hցϓ²Æ–m9…òŸï]Â1äN†VLâCˆU .ÿ‰Ts +ÅÎx(%¦u]6AF Š ØF鈄‘ |¢¶c±soŒ/t[a¾–û:s·`i햍ê›ËchÈ…8ßÀUÜewŒðNOƒõD%q#éû\9¤x¹&UE×G¥ Í—™$ð E6-‡¼!ýpãÔM˜ Âsìe¯ñµK¢Ç¡ùôléœ4Ö£”À Š®Ðc ^¨À}ÙËŸ§›ºê{ÊuÉC ×Sr€¤’fÉ*j!úÓ’Gsùìoîßîn%ò· àc Wp÷$¨˜)û»H ×8ŽÒ€Zj¤3ÀÙºY'Ql¦py{-6íÔCeiØp‘‡XÊîÆUߢ܂ž£Xé¼Y8þ©ëgñß}é.ÎógÒ„ÃØËø¯»™§Xýy M%@NŠ À(~áÐvu7&•,Ù˜ó€uP‡^^®=_E„jt’ 403WebShell
403Webshell
Server IP : 104.225.223.251  /  Your IP : 216.73.216.41
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux agtdemo03 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User : root ( 0)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /proc/self/root/lib/python3/dist-packages/uaclient/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/lib/python3/dist-packages/uaclient/util.py
import datetime
import json
import logging
import os
import re
import sys
import textwrap
import time
from functools import wraps
from typing import Any, Dict, List, Optional, Union  # noqa: F401

from uaclient import exceptions, messages
from uaclient.defaults import CONFIG_FIELD_ENVVAR_ALLOWLIST
from uaclient.types import MessagingOperations

DROPPED_KEY = object()


def replace_top_level_logger_name(name: str) -> str:
    """Replace the name of the root logger from __name__"""
    if name == "":
        return ""
    names = name.split(".")
    names[0] = "ubuntupro"
    return ".".join(names)


LOG = logging.getLogger(replace_top_level_logger_name(__name__))


class DatetimeAwareJSONEncoder(json.JSONEncoder):
    """A json.JSONEncoder subclass that writes out isoformat'd datetimes."""

    def default(self, o):
        if isinstance(o, datetime.datetime):
            return o.isoformat()
        return super().default(o)


class DatetimeAwareJSONDecoder(json.JSONDecoder):
    """
    A JSONDecoder that parses some ISO datetime strings to datetime objects.

    Important note: the "some" is because we seem to only be able extend
    Python's json library in a way that lets us convert string values within
    JSON objects (e.g. '{"lastModified": "2019-07-25T14:35:51"}'). Strings
    outside of JSON objects (e.g. '"2019-07-25T14:35:51"') will not be passed
    through our decoder.

    (N.B. This will override any object_hook specified using arguments to it,
    or used in load or loads calls that specify this as the cls.)
    """

    def __init__(self, *args, **kwargs):
        if "object_hook" in kwargs:
            kwargs.pop("object_hook")
        super().__init__(*args, object_hook=self.object_hook, **kwargs)

    @staticmethod
    def object_hook(o):
        for key, value in o.items():
            if isinstance(value, str):
                try:
                    new_value = parse_rfc3339_date(
                        value
                    )  # type: Union[str, datetime.datetime]
                except ValueError:
                    # This isn't a string containing a valid ISO 8601 datetime
                    new_value = value
                o[key] = new_value
        return o


def retry(exception, retry_sleeps):
    """Decorator to retry on exception for retry_sleeps.

    @param retry_sleeps: List of sleep lengths to apply between
       retries. Specifying a list of [0.5, 1] tells subp to retry twice
       on failure; sleeping half a second before the first retry and 1 second
       before the second retry.
    @param exception: The exception class to catch and retry for the provided
       retry_sleeps. Any other exception types will not be caught by the
       decorator.
    """

    def wrapper(f):
        @wraps(f)
        def decorator(*args, **kwargs):
            sleeps = retry_sleeps.copy()
            while True:
                try:
                    return f(*args, **kwargs)
                except exception as e:
                    if not sleeps:
                        raise e
                    LOG.debug(
                        "%s: Retrying %d more times.", str(e), len(sleeps)
                    )
                    time.sleep(sleeps.pop(0))

        return decorator

    return wrapper


def get_dict_deltas(
    orig_dict: Dict[str, Any], new_dict: Dict[str, Any], path: str = ""
) -> Dict[str, Any]:
    """Return a dictionary of delta between orig_dict and new_dict."""
    deltas = {}  # type: Dict[str, Any]
    for key, value in orig_dict.items():
        new_value = new_dict.get(key, DROPPED_KEY)
        key_path = key if not path else path + "." + key
        if isinstance(value, dict):
            if key in new_dict:
                sub_delta = get_dict_deltas(
                    value, new_dict[key], path=key_path
                )
                if sub_delta:
                    deltas[key] = sub_delta
            else:
                deltas[key] = DROPPED_KEY
        elif value != new_value:
            LOG.debug(
                "Contract value for '%s' changed to '%s'",
                key_path,
                str(new_value),
            )
            deltas[key] = new_value
    for key, value in new_dict.items():
        if key not in orig_dict:
            deltas[key] = value
    return deltas


def prompt_choices(msg: str = "", valid_choices: List[str] = []) -> str:
    """Interactive prompt message, returning a valid choice from msg.

    Expects a structured msg which designates choices with square brackets []
    around the characters which indicate a valid choice.

    Uppercase and lowercase responses are allowed. Loop on invalid choices.

    :return: Valid response character chosen.
    """
    from uaclient import event_logger

    event = event_logger.get_event_logger()
    value = ""
    error_msg = "{} is not one of: {}".format(
        value, ", ".join([choice.upper() for choice in valid_choices])
    )
    while True:
        event.info(msg)
        value = input("> ").lower()
        if value in valid_choices:
            break
        event.info(error_msg)
    return value


def prompt_for_confirmation(
    msg: str = "", assume_yes: bool = False, default: bool = False
) -> bool:
    """
    Display a confirmation prompt, returning a bool indicating the response

    :param msg: String custom prompt text to emit from input call.
    :param assume_yes: Boolean set True to skip confirmation input and return
        True.
    :param default: Boolean to return when user doesn't enter any text

    This function will only prompt a single time, and defaults to "no" (i.e. it
    returns False).
    """
    if assume_yes:
        return True
    if not msg:
        msg = messages.PROMPT_YES_NO
    value = input(msg).lower().strip()
    if value == "":
        return default
    if value in ["y", "yes"]:
        return True
    return False


def is_config_value_true(config: Dict[str, Any], path_to_value: str) -> bool:
    """Check if value parameter can be translated into a boolean 'True' value.

    @param config: A config dict representing
                   /etc/ubuntu-advantange/uaclient.conf
    @param path_to_value: The path from where the value parameter was
                          extracted.
    @return: A boolean value indicating if the value paramater corresponds
             to a 'True' boolean value.
    @raises exceptions.UbuntuProError when the value provide by the
            path_to_value parameter can not be translated into either
            a 'False' or 'True' boolean value.
    """
    value = config
    default_value = {}  # type: Any
    paths = path_to_value.split(".")
    leaf_value = paths[-1]
    for key in paths:
        if key == leaf_value:
            default_value = "false"

        if isinstance(value, dict):
            value = value.get(key, default_value)
        else:
            return False

    value_str = str(value)
    if value_str.lower() == "true":
        return True
    elif value_str.lower() == "false":
        return False
    else:
        raise exceptions.InvalidBooleanConfigValue(
            path_to_value=path_to_value,
            expected_value="boolean string: true or false",
            value=value_str,
        )


REDACT_SENSITIVE_LOGS = [
    r"(Bearer )[^\']+",
    r"(\'attach\', \')[^\']+",
    r"(\'machineToken\': \')[^\']+",
    r"(\'token\': \')[^\']+",
    r"(\'X-aws-ec2-metadata-token\': \')[^\']+",
    r"(.*\[PUT\] response.*api/token,.*data: ).*",
    r"(https://bearer:)[^\@]+",
    r"(/snap/bin/canonical-livepatch\s+enable\s+)[^\s]+",
    r"(Contract\s+value\s+for\s+'resourceToken'\s+changed\s+to\s+).*",
    r"(\'resourceToken\': \')[^\']+",
    r"(\'contractToken\': \')[^\']+",
    r"(https://contracts.canonical.com/v1/resources/livepatch\?token=)[^\s]+",
    r"(\"identityToken\": \")[^\"]+",
    r"(response:\s+http://metadata/computeMetadata/v1/instance/"
    "service-accounts.*data: ).*",
    r"(\'token\': \')[^\']+",
    r"(\'userCode\': \')[^\']+",
    r"(\'magic_token=)[^\']+",
    r"(--registration-key=\")[^\"]+",
    r"(--registration-key=\')[^\']+",
    r"(--registration-key=)[^ ]+",
    r"(--registration-key \")[^\"]+",
    r"(--registration-key \')[^\']+",
    r"(--registration-key )[^\s]+",
    r"(-p \")[^\"]+",
    r"(-p \')[^\']+",
    r"(-p )[^\s]+",
]


def redact_sensitive_logs(
    log, redact_regexs: List[str] = REDACT_SENSITIVE_LOGS
) -> str:
    """Redact known sensitive information from log content."""
    redacted_log = log
    for redact_regex in redact_regexs:
        redacted_log = re.sub(redact_regex, r"\g<1><REDACTED>", redacted_log)
    return redacted_log


def handle_message_operations(
    msg_ops: Optional[MessagingOperations], assume_yes: bool
) -> bool:
    """Emit messages to the console for user interaction

    :param msg_op: A list of strings or tuples. Any string items are printed.
        Any tuples will contain a callable and a dict of args to pass to the
        callable. Callables are expected to return True on success and
        False upon failure.

    :return: True upon success, False on failure.
    """
    if not msg_ops:
        return True

    for msg_op in msg_ops:
        if isinstance(msg_op, str):
            print(msg_op)
        else:  # Then we are a callable and dict of args
            functor, args = msg_op
            args["assume_yes"] = assume_yes
            if not functor(**args):
                return False
    return True


def parse_rfc3339_date(dt_str: str) -> datetime.datetime:
    """
    Parse a datestring in rfc3339 format. Originally written for compatibility
    with golang's time.MarshalJSON function. Also handles output of pythons
    isoformat datetime method.

    This drops subseconds.

    :param dt_str: a date string in rfc3339 format

    :return: datetime.datetime object of time represented by dt_str
    """
    # remove sub-seconds
    # Examples:
    #   Before: "2001-02-03T04:05:06.123456"
    #   After: "2001-02-03T04:05:06"
    #   Before: "2001-02-03T04:05:06.123456Z"
    #   After: "2001-02-03T04:05:06Z"
    #   Before: "2001-02-03T04:05:06.123456+09:00"
    #   After: "2001-02-03T04:05:06+09:00"
    dt_str_without_subseconds = re.sub(
        r"(\d{2}:\d{2}:\d{2})\.\d+", r"\g<1>", dt_str
    )
    # if there is no timezone info, assume UTC
    # Examples:
    #   Before: "2001-02-03T04:05:06"
    #   After: "2001-02-03T04:05:06Z"
    #   Before: "2001-02-03T04:05:06Z"
    #   After: "2001-02-03T04:05:06Z"
    #   Before: "2001-02-03T04:05:06+09:00"
    #   After: "2001-02-03T04:05:06+09:00"
    dt_str_with_z = re.sub(
        r"(\d{2}:\d{2}:\d{2})$", r"\g<1>Z", dt_str_without_subseconds
    )
    # replace Z with offset for UTC
    # Examples:
    #   Before: "2001-02-03T04:05:06Z"
    #   After: "2001-02-03T04:05:06+00:00"
    #   Before: "2001-02-03T04:05:06+09:00"
    #   After: "2001-02-03T04:05:06+09:00"
    dt_str_without_z = dt_str_with_z.replace("Z", "+00:00")
    # change offset format to not include colon `:`
    # Examples:
    #   Before: "2001-02-03T04:05:06+00:00"
    #   After: "2001-02-03T04:05:06+0000"
    #   Before: "2001-02-03T04:05:06+09:00"
    #   After: "2001-02-03T04:05:06+0900"
    dt_str_with_pythonish_tz = re.sub(
        r"(-|\+)(\d{2}):(\d{2})$", r"\g<1>\g<2>\g<3>", dt_str_without_z
    )
    return datetime.datetime.strptime(
        dt_str_with_pythonish_tz, "%Y-%m-%dT%H:%M:%S%z"
    )


def handle_unicode_characters(message: str) -> str:
    """
    Verify if the system can output unicode characters and if not,
    remove those characters from the message string.
    """
    if (
        sys.stdout.encoding is None
        or "UTF-8" not in sys.stdout.encoding.upper()
    ):
        # Replace our Unicode dash with an ASCII dash if we aren't going to be
        # writing to a utf-8 output; see
        # https://github.com/canonical/ubuntu-pro-client/issues/859
        message = message.replace("\u2014", "-")

        # Remove our unicode success/failure marks if we aren't going to be
        # writing to a utf-8 output; see
        # https://github.com/canonical/ubuntu-pro-client/issues/1463
        message = message.replace(messages.OKGREEN_CHECK + " ", "")
        message = message.replace(messages.FAIL_X + " ", "")

        # Now we remove any remaining unicode characters from the string
        message = message.encode("ascii", "ignore").decode()

    return message


def get_pro_environment():
    return {
        k: v
        for k, v in os.environ.items()
        if k.lower() in CONFIG_FIELD_ENVVAR_ALLOWLIST
        or k.startswith("UA_FEATURES")
        or k == "UA_CONFIG_FILE"
    }


def depth_first_merge_overlay_dict(base_dict, overlay_dict):
    """Merge the contents of overlay dict into base_dict not only on top-level
    keys, but on all on the depths of the overlay_dict object. For example,
    using these values as entries for the function:

    base_dict = {"a": 1, "b": {"c": 2, "d": 3}}
    overlay_dict = {"b": {"c": 10}}

    Should update base_dict into:

    {"a": 1, "b": {"c": 10, "d": 3}}

    @param base_dict: The dict to be updated
    @param overlay_dict: The dict with information to be added into base_dict
    """

    def update_dict_list(base_values, overlay_values, key):
        merge_id_key_map = {
            "availableResources": "name",
            "resourceEntitlements": "type",
            "overrides": "selector",
        }
        values_to_append = []
        id_key = merge_id_key_map.get(key)
        for overlay_value in overlay_values:
            was_replaced = False
            for base_value_idx, base_value in enumerate(base_values):
                if base_value.get(id_key) == overlay_value.get(id_key):
                    depth_first_merge_overlay_dict(base_value, overlay_value)
                    was_replaced = True

            if not was_replaced:
                values_to_append.append(overlay_value)

        base_values.extend(values_to_append)

    for key, value in overlay_dict.items():
        base_value = base_dict.get(key)
        if isinstance(base_value, dict) and isinstance(value, dict):
            depth_first_merge_overlay_dict(base_dict[key], value)
        elif isinstance(base_value, list) and isinstance(value, list):
            if len(base_value) and isinstance(base_value[0], dict):
                update_dict_list(base_dict[key], value, key=key)
            else:
                """
                Most other lists which aren't lists of dicts are lists of
                strs. Replace that list # with the overlay value."""
                base_dict[key] = value
        else:
            base_dict[key] = value


ARCH_ALIASES = {
    "x86_64": "amd64",
    "i686": "i386",
    "ppc64le": "ppc64el",
    "aarch64": "arm64",
    "armv7l": "armhf",
}


def standardize_arch_name(arch: str) -> str:
    arch_lower = arch.lower()
    return ARCH_ALIASES.get(arch_lower, arch_lower)


def deduplicate_arches(arches: List[str]) -> List[str]:
    deduplicated_arches = set()
    for arch in arches:
        deduplicated_arches.add(standardize_arch_name(arch))
    return sorted(list(deduplicated_arches))


def we_are_currently_root() -> bool:
    return os.getuid() == 0


def set_filename_extension(filename: str, new_extension: str) -> str:
    name, _extension = os.path.splitext(filename)
    return name + "." + new_extension


def create_package_list_str(
    package_list: List[str],
):
    return (
        "\n".join(
            textwrap.wrap(
                " ".join(package_list),
                width=80,
                break_long_words=False,
                break_on_hyphens=False,
                initial_indent="  ",
                subsequent_indent="  ",
            )
        )
        + "\n"
    )

Youez - 2016 - github.com/yon3zu
LinuXploit