#!/usr/bin/python3 """ * File: cltls * Version : 1.0 * License : BSD-3-Clause * * Copyright (c) 2025 - 2026 * Ralf Senderek, Ireland. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: * This product includes software developed by Ralf Senderek. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * """ import sys, os from pathlib import Path OK = 0 ERR_USE = 1 ERR_PERM = 2 ERR_UNTRUSTED = 3 ERR_INSTALL = 4 ERR_INVALID = 5 ERR_DENY = 6 ERR_CONNECT = 7 ERR_REPLY = 8 ERR_REQUEST = 9 ERR_CORRUPT = 10 ERR_INCOMPLETE = 11 ERR_NOTFOUND = 12 ERR_UNKNOWN = 13 Status = 13 HOME = str(Path.home()) + "/.cryptlib/" # path to trusted information about servers in the end-user's home directory. CertPath = HOME + "certs/" TrustedPath = HOME + "certs/trusted/" DefaultPath = HOME + "cltls/" LogPath = HOME + "debug/" HashFile = "" CertFile = "" Mode = "GET" Version = "1.0" Authentication = "fingerprint" BufferSize = 1048570 Data = bytearray() HashLength = 32 # bytes for SHA2 HashMethod = "SHA-256" TLSVersion = "unknown" # is set if connect() succeeds CurrentSession = 0 # is set by TLS_activate RootCert = 0 Certificate = 0 ServerName = bytearray() ServerResource = bytearray() ServerPort = 443 UploadFile = bytearray() UploadData = bytearray() GETrequest = bytearray() PUTrequest = bytearray() MoreDataToRead = False # try to receive only one big chunk of data DEBUG = False # do not write debug info to STDERR LOG = False # do not write to the end-user's log file BINARY = True # store server data unchanged (default) STDOUT = False # store server data in a file (default) CHUNKED = False # do not extract lines from server data ENCODED = False # do not convert server data (default) EXTENSION = ".input" # default, no known file extension REDIRECT = False # do not allow redirection LTS = True # connect only with TLS-1.2 NOOUT = False # write received bytes into the filesystem (default) ASK = False # do not ask the user for permission to continue ALLOW_TLS13 = not LTS # do not use TLS-1.3 even if it is enabled in the library try: from cryptlib_py import * except: ERR_IMPORT = """ The python3 library is not installed. You need to install the packages cryptlib-python3 and cryptlib. You will find them for a variety of operating systems here: https://senderek.ie/cryptlib or in the Fedora repository. """ print( ERR_IMPORT ) exit( ERR_INSTALL ) #-----------------------------------------------------------# def debug ( message ): if DEBUG and message : sys.stderr.write(b'\x1b[0;34m'.decode()) sys.stderr.write( "Debug: " ) sys.stderr.write( message ) sys.stderr.write(b'\x1b[0m'.decode()) sys.stderr.write("\n") #-----------------------------------------------------------# def log ( message ): global LFD if message : # writing into the logfile is enabled with the "-silent" option only if (not LOG): sys.stdout.write( message ) sys.stdout.write("\n") else: # write all messages into the end-user's logfile LFD.write( message ) LFD.write("\n") #-----------------------------------------------------------# def error ( message ): global LFD if message : # writing into the logfile is enabled with the "-silent" option only if (not LOG): sys.stdout.write(b'\x1b[0;31m'.decode()) sys.stdout.write( "Error: " ) sys.stdout.write( message ) sys.stdout.write(b'\x1b[0m'.decode()) sys.stdout.write("\n") else: # write all messages into the end-user's logfile, no colours needed LFD.write( "Error: " + message ) LFD.write("\n") #-----------------------------------------------------------# def print_usage() : log( "usage: cltls [OPTIONs] GET https://ServerName/path/filename" ) log( " cltls [OPTIONs] GET ServerName /path/filename" ) log( " cltls [OPTIONs] PUT https://ServerName/path/filename UploadFile") log( " cltls [OPTIONs] PUT ServerName /path/filename UploadFile") log( " cltls STORE ServerName") log( " ") log( " OPTIONs are:") log( " -silent") log( " -stdout") log( " -noout") log( " -debug") log( " -LTS or -lts") log( " -redirect") log( " -ask") log( " -help") log( " -version") #-----------------------------------------------------------# def clean(): global TLS global Certificate try: cryptDestroyContext( TLS ) if ALLOW_TLS13: global TLS3 cryptDestroyContext( TLS3 ) # there may not be a Certificate cryptDestroyContext( Certificate ) cryptEnd() except: cryptEnd() #-----------------------------------------------------------# def safe_input(message): try: return input(message) except: log("") return "" #-----------------------------------------------------------# def sanitize(data): forbidden = "!\"ยง&$%()[]{}=?*;,<>\\" good = "" for i in range(len(data)) : if (data[i] not in forbidden) and (ord(data[i]) < 128): good += data[i] return good #-----------------------------------------------------------# def chmodf (FileName): # FileName is a string try: os.chmod(sanitize(FileName),0o600) return True except: return False #-----------------------------------------------------------# def create_dir(path): # creates all directories in the path path = sanitize(path) try: os.makedirs(path,exist_ok=True) os.chmod(path,0o700) return True except: error("Create directory: permission denied.") return False #-----------------------------------------------------------# def write_binary_file(buffer, pathname): # writes a bytearray into a file debug( "Writing " + str(len(buffer)) + " bytes to " + pathname ) pathname = sanitize(pathname) if len(pathname) == 0: error("Illegal file name.") return False try: F = open(pathname,'wb') F.write(buffer) F.close() chmodf(pathname) return True except: error ("Cannot write to " + pathname) return False #-----------------------------------------------------------# def write_Fingerprint(Fingerprint): # write a SHA2 hex fingerprint string into a hash file if not os.path.exists( CertPath ): create_dir( CertPath ) debug( "Writing " + str(len(Fingerprint)) + " bytes to " + HashFile ) try: F = open(HashFile,"w") F.write(Fingerprint) F.close() chmodf(HashFile) return True except: error("Cannot write to " + HashFile) return False #-----------------------------------------------------------# def read_Fingerprint(): # read a binary fingerprint from HashFile (as hex bytearray) Hex = bytearray() if os.path.exists( HashFile ) : try: Fingerprint = "" F = open(HashFile,"r") # read content as string Fingerprint = F.read() F.close() if Fingerprint : Hex = bytearray.fromhex(Fingerprint) else: # no Fingerprint available return Hex except: # the string is not a valid hex string (baaaad) Hex.append(186) Hex.append(170) Hex.append(173) return Hex #-----------------------------------------------------------# def set_authentication(Session): global Authentication Authentication = "fingerprint" Fingerprint = read_Fingerprint() if Fingerprint: # a fingerprint value exists debug("Setting the server\'s fingerprint to " + str(Fingerprint.hex())) if (HashLength == 32) and (len(Fingerprint) == HashLength) : cryptSetAttributeString( Session, CRYPT_SESSINFO_SERVER_FINGERPRINT_SHA2, Fingerprint) return OK elif (HashLength == 20) and (len(Fingerprint) == HashLength) : cryptSetAttributeString( Session, CRYPT_SESSINFO_SERVER_FINGERPRINT_SHA1, Fingerprint) return OK else: # the fingerprint value is invalid (baaad) if len(Fingerprint) == 3 : log("The stored fingerprint is invalid.") return( ERR_CORRUPT ) log("Fingerprint : " + str(Fingerprint.hex() + " [" + str(len(Fingerprint)) + " bytes]")) error("The stored fingerprint does not match the available hash method " + HashMethod) return( ERR_CORRUPT ) else: # fingerprint authentication is not available at all, switch Authentication Authentication = "certificate" return OK #-----------------------------------------------------------# def TLS_activate( Session ): # return OK or ErrorCode, set activ TLSVersion global TLSVersion Status = -1 try: Status = cryptSetAttribute( Session, CRYPT_SESSINFO_ACTIVE, 1 ) ActTLS = cryptGetAttribute( Session, CRYPT_SESSINFO_VERSION ) if ActTLS == 2 : TLSVersion = "TLS-1.1" if ActTLS == 3 : TLSVersion = "TLS-1.2" if ActTLS == 4 : TLSVersion = "TLS-1.3" debug("Active TLS version : " + TLSVersion) return OK except CryptException as e : status, message = e.args debug("Activation error: " + str(message) + "[" + str(status) + "]") if status == CRYPT_ERROR_TIMEOUT : debug("Error: Timed out. [ERR_CONNECT]") return( ERR_CONNECT ) if (status == CRYPT_ERROR_NOTAVAIL) or (status == CRYPT_ERROR_NOSECURE) : debug("Error: Server_key_exchange failed. [ERR_CONNECT]") return( ERR_CONNECT ) if status == CRYPT_ERROR_BADDATA : debug("Error: Certificate import failed. [ERR_CORRUPT]") return( ERR_CORRUPT ) if status == CRYPT_ERROR_READ or status == CRYPT_ERROR_COMPLETE : debug("Error: TLS handshake failed. [ERR_CONNECT]") return( ERR_CONNECT ) if status == CRYPT_ERROR_WRONGKEY: if Authentication == "fingerprint": error("The server presents a certificate with a different fingerprint.") error("This maybe the result of a regular certificate update or the result of a malicious attack.") error("You need to investigate the cause. [ERR_UNTRUSTED]") error("To replace the stored fingerprint, delete the hash file " + str(HashFile)) else: debug("Error: You are connecting to the wrong server. [ERR_UNTRUSTED]") return( ERR_UNTRUSTED ) if status == CRYPT_ERROR_FAILED: debug("Error: Connecting to " + ServerName.decode() + " failed. [ERR_CONNECT]") return( ERR_CONNECT ) if (status == CRYPT_ERROR_PERMISSION) or (status == CRYPT_ERROR_OPEN) : debug("Error:Connecting to " + ServerName.decode() + " failed. Maybe the server is down? [ERR_CONNECT]") return( ERR_CONNECT ) if status == CRYPT_ERROR_INVALID: debug("Error: The server certificate is INVALID.") try: err = bytearray(b' '*200) cryptGetAttributeString( Session, CRYPT_ATTRIBUTE_ERRORMESSAGE, err ) debug("Connection Error: " + err.decode().strip() + " [ERR_INVALID]") except: pass return( ERR_INVALID ) return Status #-----------------------------------------------------------# def connect (): global TLS global CurrentSession # first try TLS-1.2, if it fails return an error code # if LTS=False and ALLOW_TLS13, then try TLS-1.3 Status = -1 CurrentSession = TLS try: Status = TLS_activate( TLS ) if Status == OK: debug("TLS-1.2 is active now !") return OK else: if ALLOW_TLS13 and not LTS: global TLS3 debug("Trying TLS 1.3 ...") CurrentSession = TLS3 Status = TLS_activate( TLS3 ) if Status == OK: debug("TLS-1.3 is active now !") return Status except: # return the exception error debug("Exception in connect() with status = " + str(Status)) return Status #-----------------------------------------------------------# def get_server_certificate( Session ) : global Certificate, encodedCertificate # get the server response try: Certificate = cryptGetAttribute( Session, CRYPT_SESSINFO_RESPONSE ) certFormatType = CRYPT_CERTFORMAT_TEXT_CERTIFICATE certMaxLength = cryptExportCert( None, 0, certFormatType, Certificate ) certMaxLength += 12 encodedCertificate = bytearray(b' '*certMaxLength) cryptExportCert( encodedCertificate, certMaxLength, certFormatType, Certificate ) return True except CryptException as e : status, message = e.args if status == CRYPT_ERROR_PERMISSION: log("The server does not return a valid certificate [" + str(status) + "]") log(message) return False #-----------------------------------------------------------# def set_cursor_to_last_CA(): global Certificate try: while True: cryptSetAttribute( Certificate, CRYPT_CERTINFO_CURRENT_CERTIFICATE, CRYPT_CURSOR_NEXT ) except: pass #-----------------------------------------------------------# def load_trusted_ROOT_CA( ROOT_DN ): # ROOT_DN is a bytearray global RootCert # derive a file name from the Root CA Distinguished Name root_dn = ROOT_DN.decode().strip().replace(" ","") ROOTFilename = TrustedPath + root_dn + ".cert" # enable the ".pem" extension and overwrite the ".cert" extension ROOTFilename2 = TrustedPath + root_dn + ".pem" if os.path.exists( ROOTFilename2 ): ROOTFilename = ROOTFilename2 if (os.path.exists( ROOTFilename )): debug("Reading a root certificate from file " + ROOTFilename) F = open(ROOTFilename,"rb") ImportCA = F.read() F.close() Root_CA_Object = cryptImportCert( ImportCA, CRYPT_UNUSED ) RootCert = int( Root_CA_Object ) # set implicit trust for this certificate cryptSetAttribute( RootCert, CRYPT_CERTINFO_TRUSTED_IMPLICIT, 1 ) return True else: debug( ROOTFilename + " does not exist.") return False #-----------------------------------------------------------# def get_cert_value(Cert, Type): value = bytearray(b' '*64) try: if Type == "serialnumber": cryptGetAttributeString( Cert, CRYPT_CERTINFO_SERIALNUMBER, value ) if Type == "subjectCN": cryptSetAttribute( Cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CERTINFO_SUBJECTNAME ) cryptGetAttributeString( Cert, CRYPT_CERTINFO_COMMONNAME, value ) if Type == "subjectO": cryptSetAttribute( Cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CERTINFO_SUBJECTNAME ) cryptGetAttributeString( Cert, CRYPT_CERTINFO_ORGANIZATIONNAME, value ) if Type == "subjectCO": cryptSetAttribute( Cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CERTINFO_SUBJECTNAME ) cryptGetAttributeString( Cert, CRYPT_CERTINFO_COUNTRYNAME, value ) if Type == "issuerCN": cryptSetAttribute( Cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CERTINFO_ISSUERNAME ) cryptGetAttributeString( Cert, CRYPT_CERTINFO_COMMONNAME, value ) if Type == "issuerO": cryptSetAttribute( Cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CERTINFO_ISSUERNAME ) cryptGetAttributeString( Cert, CRYPT_CERTINFO_ORGANIZATIONNAME, value ) if Type == "issuerCO": cryptSetAttribute( Cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CERTINFO_ISSUERNAME ) cryptGetAttributeString( Cert, CRYPT_CERTINFO_COUNTRYNAME, value ) if Type == "validfrom": cryptGetAttributeString( Cert, CRYPT_CERTINFO_VALIDFROM, value ) if Type == "validto": cryptGetAttributeString( Cert, CRYPT_CERTINFO_VALIDTO, value ) return value.strip() except: # exception if the type is not present in the cert pass return value.strip() #-----------------------------------------------------------# def convert_time( timebytes ) : import datetime # to convert a bytearray into a timestamp, we have to process the bytes in the reverse order. Time_str = "" T = 0 i = len(timebytes) while i > 0 : i -= 1 T = (T*256) + timebytes[i] DT = datetime.datetime.fromtimestamp( T , datetime.UTC ) Time_str = DT.strftime( '%c' ) return Time_str #-----------------------------------------------------------# def print_certchain_info(cert): import datetime debug("\nThe end-user certificate :") # fetch the end user certificate information CertList = [] cryptSetAttribute( cert, CRYPT_CERTINFO_CURRENT_CERTIFICATE, CRYPT_CURSOR_FIRST ) for T in ["serialnumber","subjectCN","subjectO","subjectCO","issuerCN","issuerO","issuerCO","validfrom","validto"]: CertList.append(get_cert_value(cert,T)) debug("Serialnumber : " + CertList[0].hex()) debug("Subject : " + CertList[1].decode()+ " "+ CertList[2].decode()+ " "+ CertList[3].decode()) debug("Issuer CA : " + CertList[4].decode()+ " "+ CertList[5].decode()+ " "+ CertList[6].decode()) item = bytearray(b' '*64) try: cryptSetAttribute( cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CERTINFO_SUBJECTALTNAME ) cryptGetAttributeString( Certificate, CRYPT_CERTINFO_DNSNAME, item ) debug("Subject AltName : " + str(item.strip().decode())) except: # no subject alt name in cert pass try: while True: item = bytearray(b' '*64) status=cryptSetAttribute( cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CURSOR_NEXT ) cryptGetAttributeString( cert, CRYPT_CERTINFO_DNSNAME, item ) debug("Subject AltName : " + str(item.strip().decode())) except: pass From_str = convert_time(CertList[7]) To_str = convert_time(CertList[8]) debug("Validity : " + From_str + " to " + To_str) # try next CA certificate try: while True: cryptSetAttribute( cert, CRYPT_CERTINFO_CURRENT_CERTIFICATE, CRYPT_CURSOR_NEXT ) debug("\nThe next CA certificate in the chain :") CertList = [] for T in ["serialnumber","subjectCN","subjectO","subjectCO","issuerCN","issuerO","issuerCO", "validfrom", "validto" ]: CertList.append(get_cert_value(cert,T)) debug("Serialnumber : " + CertList[0].hex()) debug("Subject : " + CertList[1].decode() + " " + CertList[2].decode() + " " + CertList[3].decode()) debug("Issuer CA : " + CertList[4].decode() + " " + CertList[5].decode() + " "+ CertList[6].decode()) From_str = convert_time(CertList[7]) To_str = convert_time(CertList[8]) debug("Validity : " + From_str + " to " + To_str) except: # last CA certificate has already been processed pass #-----------------------------------------------------------# def get_https_request(): global GETrequest GETrequest = bytearray(b'GET ') GETrequest.extend(ServerResource) GETrequest.extend(b' HTTP/1.1\r\nHOST: ') GETrequest.extend(ServerName) GETrequest.extend(b'\r\n\r\n') #-----------------------------------------------------------# def get_put_request(): global PUTrequest Length = len(UploadData) PUTrequest = bytearray(b'PUT ') PUTrequest.extend(ServerResource) PUTrequest.extend(b' HTTP/1.1\r\nHOST: ') PUTrequest.extend(ServerName) PUTrequest.extend(b'\r\n') PUTrequest.extend(b'Content-length: ') PUTrequest.extend(str(Length).encode()) PUTrequest.extend(b'\r\n\r\n') PUTrequest.extend(UploadData) #-----------------------------------------------------------# def check_certificate( cert ) : # returns OK or ErrorStatus # find the issuer CommonName of the last CA in the certificate set_cursor_to_last_CA() Root = bytearray(b' '*64) cryptSetAttribute( cert, CRYPT_ATTRIBUTE_CURRENT, CRYPT_CERTINFO_ISSUERNAME ) cryptGetAttributeString( cert, CRYPT_CERTINFO_COMMONNAME, Root ) log("Issuer CN : " + str(Root.strip().decode())) # load ROOTCA if load_trusted_ROOT_CA(Root): # a global RootCert has been found debug("Got a trusted ROOT CA certificate") # check the server cert log("Checking the server certificate (chain)") try: cryptCheckCert(cert, CRYPT_UNUSED) log("Server certificate verified successfully.") ##### RootCert created in load_trusted_ROOT_CA() ##### cryptDestroyContext( RootCert ) try: print_certchain_info( cert ) except: pass return OK except CryptException as e : status, message = e.args log("Server certificate check FAILED.") if status == CRYPT_ERROR_INVALID: log("The certificate is INVALID.") cryptDestroyContext( RootCert ) try: print_certchain_info( cert ) err = bytearray(b' '*100) cryptGetAttributeString( cert, CRYPT_ATTRIBUTE_ERRORMESSAGE, err.decode().strip() ) log(str(err.strip())) except: log("No detailed error message available.") return ERR_INVALID else: log("No trusted ROOT CA certificate available!") if ASK: log ("\nDo you wish to continue anyway? [yes|no] ") REPLY = safe_input("") if REPLY != "yes": log("Aborting the connection before sending any data.") return ERR_UNTRUSTED else: log("Continuing with an untrusted connection!") return OK return ERR_UNTRUSTED #-----------------------------------------------------------# def send_request(): bytesCopied = 0 try: if (Mode == "GET") : debug("Getting the resource ... " + ServerResource.decode()) get_https_request() bytesCopied = cryptPushData( CurrentSession, GETrequest ) else: get_put_request() log("Sending " + str(len(PUTrequest)) + " bytes") bytesCopied = cryptPushData( CurrentSession, PUTrequest ) debug ("Bytes sent to server : " + str(bytesCopied)) cryptFlushData( CurrentSession ) return bytesCopied except CryptException as e : status, message = e.args log("Error sending the request: " + message) clean() exit( ERR_REQUEST ) return bytesCopied #-----------------------------------------------------------# def get_server_response(): global MoreDataToRead # check if the connection is still active Chunk = bytearray( b' ' * BufferSize ) try: bytesCopied = cryptPopData( CurrentSession, Chunk, BufferSize ) if bytesCopied == BufferSize : MoreDataToRead = True else : MoreDataToRead = False return Chunk.strip() except CryptException as e : status, message = e.args log("Error: " + message) exit( ERR_REPLY ) return Chunk #-----------------------------------------------------------# def read_data_from_server(): # reads the server's reply and writes the data into a file # returns the received data in a bytearray NewData = bytearray() try: countBytes = 0 if create_dir( DefaultPath ): F = open(DefaultPath + "result.data","wb") # read one chunk from the server receivedData = get_server_response() NewBytes = len(receivedData) countBytes += NewBytes NewData.extend(receivedData[:NewBytes]) try: F.write(receivedData[:NewBytes]) except: error("Permission denied.") # if everything has been received, no looping required while MoreDataToRead : receivedData = get_server_response() NewBytes = len(receivedData) debug("Looping " + str(NewBytes)) countBytes += NewBytes NewData.extend(receivedData[:NewBytes]) F.write(receivedData[:NewBytes]) try: F.write(receivedData[:NewBytes]) except: error("Permission denied.") try: F.close() except: pass except CryptException as e : status, message = e.args error(message) exit ( ERR_INCOMPLETE ) return NewData #-----------------------------------------------------------# def evaluate_Header(header): global BINARY, CHUNKED, EXTENSION if "Content-Length:" in header : CHUNKED = False BINARY = True if "Transfer-Encoding: chunked" in header : CHUNKED = True BINARY = False if "Content-Type: image/png" in header: EXTENSION = ".png" if "Content-Type: image/tiff" in header: EXTENSION = ".tiff" if "Content-Type: image/jpeg" in header: EXTENSION = ".jpg" if "Content-Type: image/gif" in header: EXTENSION = ".gif" if "Content-Type: text/html" in header: EXTENSION = ".html" if "Content-Type: text/xml" in header: EXTENSION = ".xml" if "Content-Type: text/plain" in header: EXTENSION = ".txt" if "Content-Type: image/css" in header: EXTENSION = ".css" if "Content-Type: application/pdf" in header: EXTENSION = ".pdf" if "Content-Type: application/postscript" in header: EXTENSION = ".ps" if "Content-Type: application/x-gtar" in header: EXTENSION = ".tar" if "Content-Type: application/zip" in header: EXTENSION = ".zip" if "Content-Type: application/x-gzip" in header: EXTENSION = ".gz" if "Content-Type: application/pgp-signature" in header: EXTENSION = ".sig" if "Content-Type: application/x-rpm" in header: EXTENSION = ".rmp" if "Content-Type: application/x-deb" in header: EXTENSION = ".deb" #-----------------------------------------------------------# def convert_chunked( data ): # data must be bytearray type Clear = bytearray() start = end = 0 i = 0 while i < (len(data) -1 ) : if (data[i] == 13) and (data[i+1] == 10) : end = i # length information found # compute the length of the following chunk of bytes try: Length = data[start:end].decode() linelen = int(Length, 16) except: linelen = 0 pos = end + 2 if linelen : for byte in data[pos:pos+linelen] : Clear.append(byte) # skip '\r\n' i = pos+linelen start = i i += 1 return Clear #-----------------------------------------------------------# def get_result_path(): # return List [Path,File] as strings List = ServerResource.decode().split('/') Path = "" if List[len(List) -1 ]: File = List[len(List) -1 ] else: # the resource ends in /, store received data as "index.html" File = "index.html" for P in List[:-1] : if P : Path += "/" + P Path = ServerName.decode() + Path return [Path,File] ############################### MAIN ###################### if ( len(sys.argv) >= 2 ): # legitimate options are in the parameter list COMPLETE = False if "GET" in sys.argv : Mode = "GET" sys.argv.remove( "GET" ) elif "get" in sys.argv : Mode = "GET" sys.argv.remove( "get" ) elif "PUT" in sys.argv : Mode = "PUT" sys.argv.remove( "PUT" ) elif "put" in sys.argv : Mode = "PUT" sys.argv.remove( "put" ) elif "STORE" in sys.argv : Mode = "STORE" sys.argv.remove( "STORE" ) elif "store" in sys.argv : Mode = "STORE" sys.argv.remove( "store" ) elif "-help" in sys.argv : print_usage() exit( ERR_USE ) elif "-version" in sys.argv : print( Version ) exit( OK ) else: log("You need to specify an operation: GET or PUT or STORE") exit( ERR_USE ) else: print_usage() exit( ERR_USE ) # all modes are processed, get options and parameter if len(sys.argv) >= 2 : # check single options if "-silent" in sys.argv : # don't print anything to stdout LOG = True sys.argv.remove( "-silent" ) try: # open the log file LogFile = LogPath + "cltls.log" if not os.path.isfile(LogFile): create_dir( LogPath ) chmodf (LogFile) LFD = open(LogFile,"w") except: error("Cannot open the log file : " + LogFile) exit ( ERR_PERM ) if len(sys.argv) >= 2 : if "-debug" in sys.argv : DEBUG = True sys.argv.remove( "-debug" ) if len(sys.argv) >= 2 : if "-stdout" in sys.argv : # print server data to stdout and not to the file system STDOUT = True sys.argv.remove( "-stdout" ) if len(sys.argv) >= 2 : if "-noout" in sys.argv : # never print server data to the file system NOOUT = True sys.argv.remove( "-noout" ) if (len(sys.argv) >= 2) and ALLOW_TLS13 : if ("-LTS" in sys.argv) : # force TLS-1.2 only connection if TLS-1.3 is allowed LTS = True sys.argv.remove( "-LTS" ) if (len(sys.argv) >= 2) and ALLOW_TLS13 : if ("-lts" in sys.argv) : LTS = True sys.argv.remove( "-lts" ) if len(sys.argv) >= 2 : if "-redirect" in sys.argv : REDIRECT = True sys.argv.remove( "-redirect" ) if len(sys.argv) >= 2 : if "-ask" in sys.argv : ASK = True sys.argv.remove( "-ask" ) if len(sys.argv) >= 2 : # sys.argv[1] maybe a complete URL https://server/path-to/resource[?Querystring] try: List = sys.argv[1].split('//') if List[0] == "https:" : List2 = List[1].split('/') Name = sanitize(List2[0]) ServerName.extend( Name.encode() ) # make sure the server name is not empty if len(ServerName) == 0 : log("Please check the server name!") exit( ERR_CORRUPT ) HashFile = CertPath + ServerName.decode() + ".hash" CertFile = CertPath + ServerName.decode() + ".cert" if not os.path.exists( HashFile ): Authentication = "certificate" # all other list elements are part of the resource Resource = "" for Element in List2[1:] : Resource += "/" Resource += Element ServerResource.extend( sanitize( Resource ).encode() ) if not ServerResource: ServerResource.append(47) if Mode == "GET" or Mode == "STORE": COMPLETE = True # if PUT, then the upload file must be read if (Mode == "PUT") and not COMPLETE and (len(sys.argv) >= 3): UploadFile.extend( sanitize( sys.argv[2] ).encode() ) if os.path.exists (UploadFile.decode()) : try: F = open(UploadFile.decode(), 'br') UploadData = F.read() F.close() COMPLETE = True except: error("The file " + UploadFile.decode() + " cannot be read.") exit( ERR_PERM ) else: error("The file " + UploadFile.decode() + " does not exist.") exit( ERR_PERM ) except: # server and resource may be given as separate parameters COMPLETE = False if len(sys.argv) >= 2 and not COMPLETE : # extract ServerName and ServerResource from parameter list Name = sanitize( sys.argv[1] ) # do not allow / in a server name NewName = "" for char in Name: if char != '/': NewName += char ServerName.extend( NewName.encode() ) # make sure the server name is not empty if len(ServerName) == 0 : log("Please check the server name!") exit( ERR_CORRUPT ) HashFile = CertPath + ServerName.decode() + ".hash" CertFile = CertPath + ServerName.decode() + ".cert" if not os.path.exists( HashFile ): Authentication = "certificate" if Mode == "STORE" : # only the ServerName is required COMPLETE = True if (len (sys.argv) >= 3) and ((Mode == "GET") or (Mode == "PUT")) : # read the server target resource ServerResource.extend( sanitize( sys.argv[2] ).encode() ) # make sure the resource starts with a / if ServerResource[0] != 47 : ServerResource = bytearray(b'/') + ServerResource if (Mode == "GET") : # ServerName and ServerResource are OK COMPLETE = True if (Mode == "PUT") and not COMPLETE: if (len(sys.argv) >= 4) : UploadFile.extend( sanitize( sys.argv[3] ).encode() ) if os.path.exists (UploadFile.decode()) : try: F = open(UploadFile.decode(), 'br') UploadData = F.read() F.close() COMPLETE = True except: error("The file " + UploadFile.decode() + " cannot be read.") exit( ERR_PERM ) else: error("The file " + UploadFile.decode() + " does not exist.") exit( ERR_PERM ) else: log("usage: cltls PUT https://ServerName/path/filename UploadFile") log("or cltls PUT ServerName /path/filename UploadFile") exit( ERR_INCOMPLETE ) if not COMPLETE : print_usage() exit( ERR_USE ) ##### Begin Cryptlib code ##### Status = -1 try: cryptUser = CRYPT_UNUSED cryptInit() # collect randomness information cryptAddRandom( CRYPT_RANDOM_SLOWPOLL ) # get Cryptlib Version Major = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_MAJORVERSION) Minor = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_MINORVERSION) Step = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_STEPPING) CryptlibVersion = str(Major)+"."+str(Minor)+"."+str(Step) log( "cltls " + Version + " uses Cryptlib version " + CryptlibVersion ) if CryptlibVersion < "3.4.9": log("Using SHA-1 fingerprints because SHA-2 requires at least cryptlib 3.4.9") HashLength = 20 HashMethod = "SHA-1" # check the availability of TLS-1.3 in the library, if its use is allowed if ALLOW_TLS13: TLS_object3 = cryptCreateSession( CRYPT_UNUSED, CRYPT_SESSION_TLS ) TLS3 = int(TLS_object3) try: cryptSetAttributeString( TLS3, CRYPT_SESSINFO_SERVER_NAME, ServerName ) cryptSetAttribute( TLS3, CRYPT_SESSINFO_SERVER_PORT, ServerPort ) cryptSetAttribute( TLS3, CRYPT_SESSINFO_VERSION, 4) # TLS-1.3 # check fingerprint authentication Status = set_authentication(TLS3) debug("The current authentication method is: " + Authentication) if Status != OK : # Fingerprint authentication failed, clean is needed! error("Fingerprint authentication failed [" + str(Status) + "]") clean() exit( Status ) except: # if no TLS-1.3 support is available switch to LTS mode and block TLS-1.3 error("TLS-1.3 is not supported in this cryptlib build.") # LTS mode ensures that TLS3 is not being used anymore. LTS = True ALLOW_TLS13 = False # and destroy the TLS3 context cryptDestroyContext( TLS3 ) # create the default TLS-1.2 session object TLS_object = cryptCreateSession( CRYPT_UNUSED, CRYPT_SESSION_TLS ) TLS = int(TLS_object) try: # set the initial values cryptSetAttributeString( TLS, CRYPT_SESSINFO_SERVER_NAME, ServerName ) debug("Set server name " + ServerName.decode()) cryptSetAttribute( TLS, CRYPT_SESSINFO_SERVER_PORT, ServerPort ) cryptSetAttribute( TLS, CRYPT_SESSINFO_VERSION, 3) # TLS-1.2-LTS debug("Set server PORT " + str(ServerPort)) # check fingerprint authentication Status = set_authentication(TLS) debug("The current authentication method is: " + Authentication) if Status != OK : # Fingerprint authentication failed, clean is needed! error("Fingerprint authentication failed [" + str(Status) + "]") clean() exit( Status ) except: # no clean() allowed here error("TLS-1.2 cannot be initialised.") exit ( Status ) Status = -1 ### STORE ### if (Mode == "STORE") : log("Connecting ...") Status = connect () if not (Status == OK): error("Connection error: " + str(Status)) clean() exit(Status) if not get_server_certificate(CurrentSession) : error("Cannot store the fingerprint, no certificate in reply.") clean() exit( ERR_CORRUPT ) # get current certificate fingerprint try: Hashvalue = bytearray(b' ' * HashLength) if HashLength == 32 : cryptGetAttributeString( Certificate, CRYPT_CERTINFO_FINGERPRINT_SHA2, Hashvalue ) else: cryptGetAttributeString( Certificate, CRYPT_CERTINFO_FINGERPRINT_SHA1, Hashvalue ) except: log("Cannot determine the hash method.") exit( ERR_CORRUPT ) debug("Certificate hash value: " + Hashvalue.hex()) if os.path.exists(HashFile): try: F = open(HashFile,"r") Current = F.read() F.close() except: Current = "" if Current: log("The following fingerprint is already stored :\n") log(" " + Current) REPLY = safe_input("Overwrite this fingerprint ? [yes|no] : ") if REPLY == "yes" : # overwrite the current hex string success = write_Fingerprint(Hashvalue.hex()) if not success: clean() exit( ERR_PERM ) ServerCertificate = encodedCertificate.decode().replace(' CHAIN','') success = write_binary_file(ServerCertificate.encode(), CertFile) if not success: clean() exit( ERR_PERM ) else: log("Nothing done.") clean() exit( ERR_USE ) else: # save new fingerprint and certificate write_Fingerprint(Hashvalue.hex()) ServerCertificate = encodedCertificate.decode().replace(' CHAIN','') write_binary_file(ServerCertificate.encode(), CertFile) clean() exit( OK ) ### GET ### OR ### PUT ### if (Mode == "GET") or (Mode == "PUT") : if (Mode == "GET") : log("Requesting ... " + ServerResource.decode() + " from " + ServerName.decode()) else: log("Uploading ... " + str(len(UploadData)) + " bytes to " + ServerName.decode()) Status = connect () if not (Status == OK): error("Connection error: " + str(Status)) clean() exit(Status) # the connection has been established if Authentication == "certificate": debug("Checking the server certificate ...") if get_server_certificate(CurrentSession) : Status = check_certificate(Certificate) if Status != OK: error("Cannot continue") clean() exit( Status ) else: error( "FATAL: Got no valid certificate." ) clean() exit( ERR_CONNECT ) # at this point either fingerprint authentication has succeeded # because the server certificate matches a stored fingerprint # and the Authentication value is still "fingerprint" # or # the server's certificate could be verified using a stored # and trusted Root-CA certificate. # # If the cert verification failed, the program has already exited with # ERR_INVALID or ERR_UNTRUSTED in check_certificate( cert ), # unless the user explicitly replies "yes" when asked to continue! NumBytes = 0 NumBytes = send_request() if NumBytes == 0: # server has not accepted any bytes error("The server rejected our request.") clean() exit( ERR_REQUEST ) debug("Reading the server's reply ...") Data = bytearray() Data = read_data_from_server() log(str(len(Data)) + " bytes received from server." ) # if Content-Length: in HEADER, get proper file extension and save the bytes unchanged. # if Transfer-Encoding: chunked in HEADER, loop and remove hex lengths # analysis of the result if not Data: error("No data in header") clean() exit( ERR_INCOMPLETE ) pos = Data.find(b'\r\n\r\n', 0) if pos != -1 : # found HEADER and CONTENT Header = Data[:pos+4] Content = Data[pos+4:] else: # Server sends only the HEADER Header = Data Content = bytearray() evaluate_Header( Header.decode() ) # write header to file try: F = open(DefaultPath + "result.header","wb") F.write(Header) F.close() except: error("Permission denied : " + DefaultPath + "result.header") # check the header for errors if not "200 OK" in Header.decode() : pos = Header.find(b'\r\n') if "404 Not Found" in Header.decode() : debug("HEADER: " + Header[:pos].decode()) clean() exit( ERR_NOTFOUND ) elif ("301 Moved Permanently" in Header.decode()) or ("302 " in Header.decode()) : debug("HEADER: " + Header[:pos].decode()) if REDIRECT: # check for redirection URL and try again pos = Data.find(b'Location: ', 0) pos2 = Data.find(b'location: ', 0) if (pos != -1) or (pos2 != -1): # new resource found, check if the server name has changed, # because only redirection to the same already authenticated # server is allowed. if pos2 != -1 : pos = pos2 begin = pos + 10 end = Data.find(b'\r\n', begin) NewURL = bytearray() NewURL = Data[begin:end] if (b'https://' ) in NewURL : # remove "https://" NewURL = NewURL[8:] if (b'http://' ) in NewURL : # remove "http://" NewURL = NewURL[7:] # no ServerName in Location, check absolute path / if NewURL[0] == 47: NewURL = ServerName + NewURL # try to connect with https:// anyway log("Trying to redirect to : https://" + NewURL.decode()) pos = NewURL.find(b'/', 0) NewResource = NewURL[pos:] NewServerName = NewURL[:pos] # check if ":port" is part of the new server name and remove it index = NewServerName.find(b':',0) if index != -1 : NewServerName = NewServerName[:index] if ServerName != NewServerName : error("Redirection to a different server is not allowed.") clean() exit( ERR_DENY ) if NewResource : log("Redirecting to : "+ NewResource.decode()) ServerResource = NewResource NumBytes = 0 NumBytes = send_request() if NumBytes == 0: # server has not accepted any bytes error("The server rejected our request.") clean() exit( ERR_REQUEST ) Data = bytearray() Data = read_data_from_server() log(str(len(Data)) + " bytes received from server." ) if b'200 OK' in Data[:30] : # success pos = Data.find(b'\r\n\r\n', 0) Header = Data[:pos+4] Content = Data[pos+4:] evaluate_Header( Header.decode() ) # write header to file try: F = open(DefaultPath + "result.header","wb") F.write(Header) F.close() except: error("Permission denied : " + DefaultPath ) else: # cannot proceed without a new resource error("No redirection, because Location is missing in header.") clean() exit( ERR_DENY ) else: log("HEADER: " + Header[:pos].decode()) log("No data will be written.") clean() exit( ERR_UNKNOWN ) if "200 OK" in Header.decode() : # server returned "200 OK", write the data to a file or stdout if not STDOUT: if NOOUT: # prevent writing to the filesystem debug("Server data has been received. But it will not be written into a file.") debug("This allows complete debug information without file system modification.") else: # derive target file name from ServerResource ResultPath = get_result_path()[0] ResultFile = get_result_path()[1] if create_dir(ResultPath) : ContentFile = ResultPath + "/" + ResultFile else: # Permission error log("Cannot create the directory: " + ResultPath) log("Writing to '" + DefaultPath + "' instead !") create_dir( DefaultPath + ResultPath ) ContentFile = DefaultPath + ResultPath + "/" + ResultFile log("Writing result to " + ContentFile) try: F = open(ContentFile,"wb") if not CHUNKED: F.write(Content) else: F.write(convert_chunked(Content)) F.close() except: error("Cannot write bytes to " + ContentFile) clean() exit( ERR_PERM ) else: # write server data only to stdout # use buffer.write to write bytes to stdout if not CHUNKED: sys.stdout.buffer.write(Content) else: sys.stdout.buffer.write(convert_chunked(Content)) else: # server did not return a "200 OK" log("No valid data received from server.") pos = Header.find(b'\r\n', 0) debug("HEADER: " + Header[:pos].decode()) clean() exit( ERR_INCOMPLETE ) clean() exit( OK ) except CryptException as e : status, message = e.args error( message ) clean() exit( ERR_UNKNOWN ) exit( OK )