#!/usr/bin/python3 """ * File : claes * Version : 1.1 * License : BSD-3-Clause * * Copyright (c) 2022 - 2025 * Ralf Senderek, Ireland. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: * This product includes software developed by Ralf Senderek. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. """ import sys, os from binascii import * # error return codes ERR_CL = -1 OK = 0 ERR_NOBYTES = 1 ERR_PERM = 2 ERR_PASSWORD = 3 ERR_INSTALL = 4 ERR_WRONGKEY = 5 ERR_DECODE = 6 ERR_SIZE = 7 ERR_ENCRYPT = 8 ERR_DECRYPT = 9 ERR_CORRUPT = 10 ERR_INCOMPLETE = 11 ERR_INPUT = 12 ERR_DATATYPE = 13 try: from cryptlib_py import * except: ERR_IMPORT = """ The python3 library is not installed. You need to install the packages cryptlib-python3 and cryptlib. You will find them for a variety of operating systems here: https://senderek.ie/cryptlib or in the Fedora repository. """ print( ERR_IMPORT ) exit( ERR_INSTALL ) Version = "1.1" DEBUG = False BINARY = False MaxBytes = 268400000 # no more than INT_MAX//8 (pow(2,31)//8) MinBufferSize = 32768 # no less than 8192 MaxBufferSize = MaxBytes + MinBufferSize # no more than 268435456 Source = "file" Mode = "pgp" InputBytes = "" AESblocksize = 16 NumberOfBlocks = 0 FileName = "" MinPasswordLength = 8 MaxPasswordLength = 64 DECRYPTION = False ENVELOPE = True KEY128 = False PBKDFIterations = 10000 # Openssl-enc default iteration count COMPRESSION = False ASKPASS = "/bin/systemd-ask-password" if not os.path.isfile(ASKPASS) : print ("Error: Please install " + ASKPASS + " to ensure safe password input") exit(ERR_INSTALL) #-------------------------------------------------# def print_help(): Help = """ claes encrypts or decrypts data in OpenPGP, CMS or OpenSSL format using files or standard input with a passphrase-based AES cipher. usage: claes [-debug] [-decrypt] [-cms | -openssl [-128]] [OPTION] [FILE | -] If no FILE or "-" is given, data is read from standard input. The input size is limited to 256 MByte. OPTIONS are: -help display this message -version display version information -debug print debugging information to stderr -decrypt decrypt the input data (encrypt is the default) -cms produce CMS enveloped and encrypted data instead of OpenPGP, always base64 -openssl produce encrypted data using pbkdf2 in openssl format -128 forces the use of 128 bit AES keys with -openssl (256 bits is the default) -binary uses binary data for input and output -compress forces compression before data is encrypted Full documentation This program depends on two packages providing the cryptlib shared object library and the python3-bindings to this library. You can download both packages in RPM and DEB format at: https://senderek.ie/cryptlib/downloads Or in FEDORA you can install the packages cryptlib and cryptlib-python3 directly from the repository. In addition the program /bin/systemd-ask-password is used to read sensible data from stdin. This program is part of the systemd package. INTEROPERABILITY gpg2: Without any options claes produces OpenPGP (base64-encoded) encrypted messages using AES-128. It can decrypt any message produced by GnuPG (base64 or binary) with the following ciphers: AES, AES192, AES256, 3DES and CAST-128. openssl: In OpenSSL mode claes writes encrypted messages in the proprietary OpenSSL format using AES256 as the default. These messages can be decrypted with openssl : openssl aes-256-cbc -pbkdf2 -d -a -in FILE.asc The use of AES-128 can be forced by the additional option -128. """ print( Help ) #-----------------------------------------------------------# def print_debug ( message ): if DEBUG: sys.stderr.write( "Debug: " ) sys.stderr.write( message + "\n" ) #-----------------------------------------------------------# def get_proper_filename(): global OutFilename if (not DECRYPTION) : if (Source == "file") : if ("cms" in Mode) : OutFilename = FileName + ".cms" else: if (BINARY) and (Mode == "pgp") : OutFilename = FileName + ".gpg" elif (BINARY) and (Mode == "openssl"): OutFilename = FileName + ".ssl" else: OutFilename = FileName + ".asc" if (os.path.isfile(OutFilename)) : RET = input("Overwrite " + OutFilename + " ? [y/n] ") if (RET != "y") : OutFilename = input("File name to write : ") else: # source is standard input, write to stdin.asc OutFilename = "./stdin.asc" else: # Decryption if (Source == "file") : if ((FileName[-4:] == ".asc") or (FileName[-4:] == ".cms") or (FileName[-4:] == ".gpg") or (FileName[-4:] == ".pgp") or (FileName[-4:] == ".ssl") ) : OutFilename = FileName[:-4] else: # get a proper file name to write to OutFilename = input("File name to write : ") if (os.path.isfile(OutFilename)) : RET = input("Overwrite " + OutFilename + " ? [y/n] ") if (RET != "y") : OutFilename = input("Filename to write : ") #-----------------------------------------------------------# def unix (command) : Result = "" if os.name == "posix" : Pipe = os.popen(command, "r") Result = Pipe.read() Pipe.close() return Result #-----------------------------------------------# CRC_24_INITIALIZATION = 0x00b704ce CRC_24_TABLE = ( 0x000000, 0x864cfb, 0x8ad50d, 0x0c99f6, 0x93e6e1, 0x15aa1a, 0x1933ec, 0x9f7f17, 0xa18139, 0x27cdc2, 0x2b5434, 0xad18cf, 0x3267d8, 0xb42b23, 0xb8b2d5, 0x3efe2e, 0xc54e89, 0x430272, 0x4f9b84, 0xc9d77f, 0x56a868, 0xd0e493, 0xdc7d65, 0x5a319e, 0x64cfb0, 0xe2834b, 0xee1abd, 0x685646, 0xf72951, 0x7165aa, 0x7dfc5c, 0xfbb0a7, 0x0cd1e9, 0x8a9d12, 0x8604e4, 0x00481f, 0x9f3708, 0x197bf3, 0x15e205, 0x93aefe, 0xad50d0, 0x2b1c2b, 0x2785dd, 0xa1c926, 0x3eb631, 0xb8faca, 0xb4633c, 0x322fc7, 0xc99f60, 0x4fd39b, 0x434a6d, 0xc50696, 0x5a7981, 0xdc357a, 0xd0ac8c, 0x56e077, 0x681e59, 0xee52a2, 0xe2cb54, 0x6487af, 0xfbf8b8, 0x7db443, 0x712db5, 0xf7614e, 0x19a3d2, 0x9fef29, 0x9376df, 0x153a24, 0x8a4533, 0x0c09c8, 0x00903e, 0x86dcc5, 0xb822eb, 0x3e6e10, 0x32f7e6, 0xb4bb1d, 0x2bc40a, 0xad88f1, 0xa11107, 0x275dfc, 0xdced5b, 0x5aa1a0, 0x563856, 0xd074ad, 0x4f0bba, 0xc94741, 0xc5deb7, 0x43924c, 0x7d6c62, 0xfb2099, 0xf7b96f, 0x71f594, 0xee8a83, 0x68c678, 0x645f8e, 0xe21375, 0x15723b, 0x933ec0, 0x9fa736, 0x19ebcd, 0x8694da, 0x00d821, 0x0c41d7, 0x8a0d2c, 0xb4f302, 0x32bff9, 0x3e260f, 0xb86af4, 0x2715e3, 0xa15918, 0xadc0ee, 0x2b8c15, 0xd03cb2, 0x567049, 0x5ae9bf, 0xdca544, 0x43da53, 0xc596a8, 0xc90f5e, 0x4f43a5, 0x71bd8b, 0xf7f170, 0xfb6886, 0x7d247d, 0xe25b6a, 0x641791, 0x688e67, 0xeec29c, 0x3347a4, 0xb50b5f, 0xb992a9, 0x3fde52, 0xa0a145, 0x26edbe, 0x2a7448, 0xac38b3, 0x92c69d, 0x148a66, 0x181390, 0x9e5f6b, 0x01207c, 0x876c87, 0x8bf571, 0x0db98a, 0xf6092d, 0x7045d6, 0x7cdc20, 0xfa90db, 0x65efcc, 0xe3a337, 0xef3ac1, 0x69763a, 0x578814, 0xd1c4ef, 0xdd5d19, 0x5b11e2, 0xc46ef5, 0x42220e, 0x4ebbf8, 0xc8f703, 0x3f964d, 0xb9dab6, 0xb54340, 0x330fbb, 0xac70ac, 0x2a3c57, 0x26a5a1, 0xa0e95a, 0x9e1774, 0x185b8f, 0x14c279, 0x928e82, 0x0df195, 0x8bbd6e, 0x872498, 0x016863, 0xfad8c4, 0x7c943f, 0x700dc9, 0xf64132, 0x693e25, 0xef72de, 0xe3eb28, 0x65a7d3, 0x5b59fd, 0xdd1506, 0xd18cf0, 0x57c00b, 0xc8bf1c, 0x4ef3e7, 0x426a11, 0xc426ea, 0x2ae476, 0xaca88d, 0xa0317b, 0x267d80, 0xb90297, 0x3f4e6c, 0x33d79a, 0xb59b61, 0x8b654f, 0x0d29b4, 0x01b042, 0x87fcb9, 0x1883ae, 0x9ecf55, 0x9256a3, 0x141a58, 0xefaaff, 0x69e604, 0x657ff2, 0xe33309, 0x7c4c1e, 0xfa00e5, 0xf69913, 0x70d5e8, 0x4e2bc6, 0xc8673d, 0xc4fecb, 0x42b230, 0xddcd27, 0x5b81dc, 0x57182a, 0xd154d1, 0x26359f, 0xa07964, 0xace092, 0x2aac69, 0xb5d37e, 0x339f85, 0x3f0673, 0xb94a88, 0x87b4a6, 0x01f85d, 0x0d61ab, 0x8b2d50, 0x145247, 0x921ebc, 0x9e874a, 0x18cbb1, 0xe37b16, 0x6537ed, 0x69ae1b, 0xefe2e0, 0x709df7, 0xf6d10c, 0xfa48fa, 0x7c0401, 0x42fa2f, 0xc4b6d4, 0xc82f22, 0x4e63d9, 0xd11cce, 0x575035, 0x5bc9c3, 0xdd8538 ) def crc24(data): # this function has been provided by Richard Mitchell # details: https://github.com/mitchellrj/python-pgp/blob/master/pgp/crc24.py result = CRC_24_INITIALIZATION for byte in data: index = ((result >> 16) ^ byte) & 0xff result = (CRC_24_TABLE[index] ^ (result << 8)) & 0x00ffffff return result #-----------------------------------------------# def crc24_encoding(buff): # input bytearray C = crc24( buff ) BC = C.to_bytes(3,'big') return "=" + str( b2a_base64(BC).decode() ) #-----------------------------------------------# def write_pgp_message(buff, pathname): # writes an encrypted bytearray into a file try: if not BINARY: F = open(pathname,'w') F.write("-----BEGIN PGP MESSAGE-----\n") F.write("Version: claes " + Version + " with cryptlib "+ CryptlibVersion +"\n\n") ASCII = b2a_base64(buff) # remove trailing \n ASCII = ASCII[:-1] i = 0 while i < len(ASCII) : line = ASCII[i:i+64] i = i + 64 F.write(line.decode()) if i < len(ASCII) : F.write("\n") F.write("\n") F.write(crc24_encoding(buff)) F.write("-----END PGP MESSAGE-----\n") F.close() else: # write buff as binary F = open(pathname,'wb') F.write( buff ) F.close() unix("chmod 600 " + pathname) except: print (str( sys.exc_info()[0]) ) print ("Error: cannot write to " + pathname) #-----------------------------------------------# def write_openssl_message(salt, buff, pathname): # writes an OpenSSL encrypted buffer into a file try: OUT = bytearray(b'Salted__') OUT.extend(salt) OUT.extend(buff) if not BINARY: F = open(pathname,'w') ASCII = b2a_base64(OUT) i = 0 while i < len(ASCII) : line = ASCII[i:i+64] i = i + 64 F.write(line.decode()) if i < len(ASCII) : F.write("\n") F.close() else: # write buff as binary F = open(pathname,'wb') F.write( OUT ) F.close() unix("chmod 600 " + pathname) except: print (str( sys.exc_info()[0]) ) print ("Error: cannot write to " + pathname) #-----------------------------------------------# def write_cms_message(buff, pathname): # writes a CMS encrypted buffer base64 encoded into a file # the option -binary has no effect here try: F = open(pathname,'w') F.write("-----BEGIN CMS-----\n") ASCII = b2a_base64(buff) i = 0 while i < len(ASCII) : line = ASCII[i:i+64] i = i + 64 F.write(line.decode()) if i < len(ASCII) : F.write("\n") F.write("-----END CMS-----\n") unix("chmod 600 " + pathname) except: print (str( sys.exc_info()[0]) ) print ("Error: cannot write to " + pathname) #-----------------------------------------------# def analyze_PGP_data( data ): global BINARY print_debug ("Trying to read OpenPGP data ...") start = end = 0 PGP_BEGIN = bytearray(b"-----BEGIN PGP MESSAGE-----\n") PGP_END = bytearray(b"-----END PGP MESSAGE-----\n") ASCII = bytearray() try: start = data.index(PGP_BEGIN) except: # no BEGIN header, input treated as binary data BINARY = True return data try: end = data.index(PGP_END) except: # no END header, base64-encoded input is incomplete or truncated # return empty bytearray() return bytearray() # both header have been found ASCII = data[start:end] # skip PGP_BEGIN header and (maybe) version line start = ASCII.index(b'\n\n') ASCII = ASCII[start:] BINARY = False return ASCII #-----------------------------------------------# def analyze_CMS_data( data ): global BINARY start = end = 0 SALTED = bytearray(b"U2FsdGVkX1") Salted = bytearray(b"Salted__") CMS_BEGIN = bytearray(b"-----BEGIN CMS-----\n") CMS_END = bytearray(b"-----END CMS-----\n") begin = False Length = len( data ) ASCII = bytearray() if (Mode == "openssl") : print_debug ("Trying to read OpenSSL data") if data[:10] == SALTED: # found base64 encoded data print_debug("Found base64-encoded OpenSSL data") BINARY = False # check if data starts with "Salted__" elif (data[:8] == Salted) : print_debug("Found binary OpenSSL data") BINARY = True ASCII = data else: # CMS data is treated as base64 always print_debug ("Trying to read CMS data") start = end = 0 CMS_BEGIN = bytearray(b"-----BEGIN CMS-----\n") CMS_END = bytearray(b"-----END CMS-----\n") try: start = data.index(CMS_BEGIN) end = data.index(CMS_END) except: # no headers, input is incomplete or corrupt # return empty bytearray() return bytearray() ASCII = data[start+len(CMS_BEGIN):end] BINARY = False # add ':none' as a dummy CRC checksum that gets removed in the # decryption section, so that CMS and PGP data look similar ASCII.append(58) ASCII.append(110) ASCII.append(111) ASCII.append(110) ASCII.append(101) return ASCII #-----------------------------------------------------------# def get_random_bytes ( num ): # this function does not need to produce cryptographically secure random numbers try: from random import randbytes return randbytes( num ) except: RandomBuffer = bytearray(b' '*num) RandomContext_object = cryptCreateContext( cryptUser, CRYPT_ALGO_AES ) RandomContext = int( RandomContext_object ) cryptSetAttribute( RandomContext, CRYPT_CTXINFO_MODE, CRYPT_MODE_CFB ) cryptGenerateKey( RandomContext ) cryptEncrypt( RandomContext, RandomBuffer ) cryptDestroyContext( RandomContext ) return RandomBuffer #-----------------------------------------------------------# def pbkdf2 ( salt ): from hashlib import pbkdf2_hmac iterations = PBKDFIterations # the password is available globally if ( Mode == "openssl" ) and salt : # generate session key and iv from password and salt using pbkdf2 # 256 bit AES is the default dk = pbkdf2_hmac('sha256', password, salt, iterations, 48) if (KEY128) : # use 128 bit AES key dk = pbkdf2_hmac('sha256', password, salt, iterations, 32) KeyandIV = bytearray() KeyandIV.extend( dk ) return KeyandIV return "" #-----------------------------------------------------------# def envelope_info(): global Envelope # check the ALGO used RESULT = bytearray() ALGO = bytearray(b' ') cryptGetAttributeString( Envelope, CRYPT_CTXINFO_NAME_ALGO, ALGO ) i = 0 while ( (ALGO[i] != 32) and (i < len(ALGO)) ) : RESULT.append(ALGO[i]) i = i + 1 RESULT.extend(b' ') KEYSIZE = bytearray() KEYSIZE = cryptGetAttribute( Envelope, CRYPT_CTXINFO_KEYSIZE ) RESULT.extend(str(KEYSIZE*8).encode()) RESULT.extend(b' ') MODE = bytearray(b' ') cryptGetAttributeString( Envelope, CRYPT_CTXINFO_NAME_MODE, MODE ) RESULT.extend(MODE) print(RESULT.decode()) #-----------------------------------------------------------# def context_info(): global AESContext # check the ALGO used RESULT = bytearray() ALGO = bytearray(b' ') cryptGetAttributeString( AESContext, CRYPT_CTXINFO_NAME_ALGO, ALGO ) i = 0 while ( (ALGO[i] != 32) and (i < len(ALGO)) ) : RESULT.append(ALGO[i]) i = i + 1 RESULT.extend(b' ') KEYSIZE = bytearray() KEYSIZE = cryptGetAttribute( AESContext, CRYPT_CTXINFO_KEYSIZE ) RESULT.extend(str(KEYSIZE*8).encode()) RESULT.extend(b' ') MODE = bytearray(b' ') cryptGetAttributeString( AESContext, CRYPT_CTXINFO_NAME_MODE, MODE ) RESULT.extend(MODE) print(RESULT.decode()) #-----------------------------------------------------------# def clean_envelope(): global Envelope global password try: print_debug("Cleaning envelope before exit") password = get_random_bytes( len(password) ) cryptDestroyEnvelope( Envelope ) cryptEnd() except: pass #-----------------------------------------------------------# def clean_context(): global AESContext global password try: print_debug("Cleaning context before exit") password = get_random_bytes( len(password) ) cryptDestroyContext( AESContext ) cryptEnd() except: pass ############################################################# if ( len(sys.argv) > 1 ): # legitimate options or a file name is in the parameter list if "-debug" in sys.argv : DEBUG = True sys.argv.remove( "-debug" ) if "-cms" in sys.argv : Mode = "cms" sys.argv.remove( "-cms" ) if "-openssl" in sys.argv : Mode = "openssl" ENVELOPE = False sys.argv.remove( "-openssl" ) if "-128" in sys.argv : KEY128 = True sys.argv.remove( "-128" ) if "-help" in sys.argv : print_help() exit( OK ) if "-version" in sys.argv : print ( Version ) exit( OK ) if "-binary" in sys.argv : BINARY = True sys.argv.remove( "-binary" ) if "-decrypt" in sys.argv : DECRYPTION = True sys.argv.remove( "-decrypt" ) if "-compress" in sys.argv : COMPRESSION = True sys.argv.remove( "-compress" ) # Allow larger input for base-64 encoded text. If the decoded binary input # is greater than MaxBytes it will be rejected later in the encryption section # or in the decryption section if len(sys.argv) > 1 : if sys.argv[1] == "-": try: if BINARY: InputBytes = sys.stdin.buffer.read( MaxBytes ) else: InputBytes = sys.stdin.buffer.read( MaxBytes * 4 // 3 ) Source = "stdin" FileName = "stdin" except: exit ( ERR_PERM ) elif os.path.isfile(sys.argv[1]) : FileName = sys.argv[1] try: F = open( FileName, "rb" ) if BINARY: InputBytes = F.read( MaxBytes ) else: InputBytes = F.read( MaxBytes * 4 // 3) F.close() except: print( "Cannot open file " + str(FileName) ) exit ( ERR_PERM ) else: print ("No such file: " + sys.argv[1] ) exit ( ERR_INPUT ) else: try: if BINARY: InputBytes = sys.stdin.buffer.read( MaxBytes ) else: InputBytes = sys.stdin.buffer.read( MaxBytes * 4 // 3) Source = "stdin" FileName = "stdin" except: exit ( ERR_PERM ) OutFilename = FileName get_proper_filename () ##### Begin Cryptlib code ##### cryptInit() try: cryptAddRandom( CRYPT_RANDOM_SLOWPOLL ) except CryptException as e : status, message = e.args print("Warning: RANDOM_SLOWPOLL failed. [" + message + "]") # get Cryptlib Version Major = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_MAJORVERSION) Minor = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_MINORVERSION) Step = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_STEPPING) CryptlibVersion = str(Major)+"."+str(Minor)+"."+str(Step) cryptUser = CRYPT_UNUSED if (not DECRYPTION) : if Mode == "pgp" : Envelope_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_PGP ) else: Envelope_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_CMS ) else: Envelope_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_AUTO ) # now an Envelope_object exists try: Envelope = int( Envelope_object ) except: print ("Cryptlib error.") cryptEnd() exit (ERR_CL) # get Data. Data must be modifiable Buffer Data = bytearray() Data.extend( InputBytes ) # randomize InputBytes InputBytes = get_random_bytes( len (InputBytes) ) # force integrity protection for all encryption if not DECRYPTION: cryptSetAttribute( Envelope, CRYPT_ENVINFO_INTEGRITY, CRYPT_INTEGRITY_FULL ) # Prompt the user for a passphrase of sufficient quality # because either encryption or decryption will use it anyway password = bytearray() password.extend( unix(ASKPASS).encode() ) if len( password ) >= MinPasswordLength and len( password ) <= MaxPasswordLength : password = password[:-1] if ( ENVELOPE and (not DECRYPTION) ) : try: # add the encryption password to the envelope cryptSetAttributeString( Envelope, CRYPT_ENVINFO_PASSWORD, password ) except CryptException as e : status, message = e.args if (status == CRYPT_ERROR_WRONGKEY) : print("Error: " + message) password = get_random_bytes( len(password) ) clean_envelope() exit( ERR_WRONGKEY ) # randomize password buffer as it is no longer needed password = get_random_bytes( len(password) ) print_debug (str(len(password)) + " bytes used as password") else: print ("Error: Your password must have at least " + str(MinPasswordLength) + " characters. Nothing done.") clean_envelope() # terminate the program exit (ERR_PASSWORD) if (not DECRYPTION) : # ENCRYPTION # expand the internal buffer which is set to 32K by default. This limits the input data size. try: cryptSetAttribute( Envelope, CRYPT_ATTRIBUTE_BUFFERSIZE, MaxBufferSize ) except CryptException as e : status, message = e.args if status != CRYPT_ENVELOPE_RESOURCE : print( "Error: cannot set BufferSize to " + str(MaxBufferSize) ) clean_envelope() exit( ERR_SIZE ) # encrypt the input if Data : print ( "Performing encryption of input data" ) else: print( "Your message is empty. Nothing to encrypt." ) clean_envelope() exit( ERR_SIZE ) Buffer = bytearray() if (ENVELOPE) : # the password has been supplied already to the envelope # if PGP or CMS, check if data must be compressed before encryption if COMPRESSION : try: print_debug( "Compressing input data ..." ) compressed = bytearray( b' ' * (len(Data) + MinBufferSize) ) if Mode == "pgp" : Compress_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_PGP ) else: Compress_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_CMS ) CompressEnvelope = int ( Compress_object ) cryptSetAttribute( CompressEnvelope, CRYPT_ATTRIBUTE_BUFFERSIZE, MaxBufferSize ) cryptSetAttribute( CompressEnvelope, CRYPT_ENVINFO_COMPRESSION, CRYPT_UNUSED ) cryptSetAttribute( CompressEnvelope, CRYPT_ENVINFO_DATASIZE, len(Data) ) bytesCopied = cryptPushData( CompressEnvelope, Data ) cryptFlushData( CompressEnvelope ) bytesCopied = cryptPopData( CompressEnvelope, compressed, MaxBufferSize ) print_debug("Retrieved " + str(bytesCopied) + " bytes of compressed data") # overwrite Data with the compressed data Data = compressed[:bytesCopied] # inform the encryption envelope that Data is compressed cryptSetAttribute( Envelope, CRYPT_ENVINFO_CONTENTTYPE, CRYPT_CONTENT_COMPRESSEDDATA ) # randomize compressed data compressed = get_random_bytes( len(compressed) ) cryptDestroyEnvelope( CompressEnvelope ) except CryptException as e : status, message = e.args print_debug( "Compression error: " + message ) print( "Compression of input data failed." ) if (len( Data ) < MaxBytes) : # we only need one pass, no looping required print_debug ( "Processing " + str( len( Data)) + " bytes of input data") try: cryptSetAttribute( Envelope, CRYPT_ENVINFO_DATASIZE, len( Data ) ) bytesCopied = cryptPushData( Envelope, Data ) except CryptException as e : status, message = e.args if status != CRYPT_ENVELOPE_RESOURCE : print( "Your message is too large. The limit is " + str(MaxBufferSize-MinBufferSize) ) clean_envelope() exit( ERR_SIZE ) print_debug ("Pushed " +str(bytesCopied) + " bytes into the envelope") if len( Data ) != bytesCopied : # inform the user and proceed print ( "WARNING: message did not fit into the envelope completely." ) try: cryptFlushData( Envelope ) except CryptException as e : status, message = e.args if status != CRYPT_ENVELOPE_RESOURCE : print( "Encryption error: " + message ) clean_envelope() exit( ERR_ENCRYPT ) # randomize cleartext data Data = get_random_bytes( len (Data) ) # prepare the cryptogram DataBufferSize = MaxBytes envelopedData = bytearray( b' ' * DataBufferSize ) bytesCopied = cryptPopData( Envelope, envelopedData, DataBufferSize ) print_debug ("Retrieving " + str(bytesCopied) + " encrypted bytes from envelope") Buffer = envelopedData[:bytesCopied] # Buffer holds the encrypted data else: print( "Input is too big" ) clean_envelope() exit( ERR_SIZE ) else: # USE ONLY CRYPT-CONTEXT and NO ENVELOPES if (Mode == "openssl") : if len(Data) >= MaxBytes: # clean password before exit password = get_random_bytes( len(password) ) print("Input is too large") exit( ERR_SIZE ) # get an AEScontext from a password and salt crypt_object = cryptCreateContext( cryptUser , CRYPT_ALGO_AES ) AESContext = int ( crypt_object ) # get 8 bytes of random data for the salt SALT = bytearray() # USE internal cryptContext to generate SALT try: SaltBuffer = bytearray(b'Cryptlib') SaltContext_object = cryptCreateContext( cryptUser, CRYPT_ALGO_AES ) SaltContext = int( SaltContext_object ) cryptSetAttribute( SaltContext, CRYPT_CTXINFO_MODE, CRYPT_MODE_CFB ) cryptGenerateKey( SaltContext ) cryptEncrypt( SaltContext, SaltBuffer ) cryptDestroyContext( SaltContext ) SALT.extend(SaltBuffer[:8]) except CryptException as e : status, message = e.args print( "CL random failed : " + message ) keyandiv = pbkdf2( SALT ) sessionkey = keyandiv[:32] iv = keyandiv[32:] if (KEY128) : sessionkey = keyandiv[:16] iv = keyandiv[16:] # make sure that len(Data) is a multiple of the AES Blocksize # use PKCS#7 padding NumberOfBlocks = int( len(Data) // AESblocksize ) reminder = int ( AESblocksize - (len(Data) % AESblocksize) ) for i in range(reminder) : Data.append( reminder ) NumberOfBlocks = NumberOfBlocks + 1 if (sessionkey) : print_debug( "Generated session key and iv") cryptSetAttribute( AESContext, CRYPT_CTXINFO_MODE, CRYPT_MODE_CBC ) cryptSetAttribute( AESContext, CRYPT_CTXINFO_KEYSIZE, AESblocksize ) cryptSetAttributeString( AESContext, CRYPT_CTXINFO_KEY, sessionkey ) cryptSetAttributeString( AESContext, CRYPT_CTXINFO_IV, iv ) # encrypt Data in the context try: status = cryptEncrypt( AESContext, Data ) except CryptException as e : status, message = e.args print_debug( "Encryption error while encrypting data ...") print_debug(str(status) + message ) clean_context() exit( ERR_ENCRYPT ) # if encryption was successful, the encrypted data is in-place cryptDestroyContext( AESContext ) print_debug( "Retrieving " + str(len(Data)) + " encrypted bytes" ) Buffer = Data # Buffer holds the encrypted data sessionkey = get_random_bytes( len(sessionkey) ) clean_context() if ( Buffer ) : # write the encrypted Buffer into the file system if Mode == "pgp" : print("Writing " + OutFilename) write_pgp_message(Buffer , OutFilename) elif Mode == "openssl" : print("Writing " + OutFilename) write_openssl_message(SALT, Buffer , OutFilename) else : print("Writing " + OutFilename) write_cms_message(Buffer , OutFilename) else: # ENCRYPTION failed print ("Encryption failed.") exit ( ERR_ENCRYPT ) else: # DECRYPTION print ("Performing decryption of input data") # get the raw data from ascii armored Data print_debug( "decrypting " + str(len(Data)) + " bytes of input" ) if ( Mode == "pgp" ) : Data = analyze_PGP_data( Data ) else: Data = analyze_CMS_data( Data ) Cleartext = bytearray() Buffer = bytearray() if (ENVELOPE) : # the password has been supplied but not added to the decryption envelope if len(Data) == 0 : print("Incomplete base64 encoded input.") if BINARY : print("Please do not use -binary with CMS data.") if (len( Data ) <= MaxBytes * 4 // 3) : # we only need one pass, no looping required if (not BINARY) : ASCII = Data[:-5] CRC = Data[-5:-1] # base64 decode ASCII try: Buffer = a2b_base64( ASCII ) except: print ( "Error: cannot decode message block" ) clean_envelope() exit (ERR_DECODE) else: if (len( Data ) < MaxBytes) : # correct binary input Buffer = Data else: # if len(Data) = MaxBytes not all binary data has been read print ( "Error: binary input is too large." ) clean_envelope() exit (ERR_SIZE) print_debug ( "Processing " + str( len( Buffer ) ) + " bytes of input data" ) if (Mode == "pgp") and (not BINARY) : # check the CRC24 on the blob. Checksum = bytearray() S = crc24_encoding( Buffer )[1:-1] Checksum.extend( S.encode() ) if ( Checksum != CRC ) : print( "CRC integrity check failed." ) # expand the internal buffer which is set to 32K by default. This limits the input data size try: cryptSetAttribute( Envelope, CRYPT_ATTRIBUTE_BUFFERSIZE, MaxBufferSize ) except CryptException as e : status, message = e.args if status != CRYPT_ENVELOPE_RESOURCE : print( "Cannot set BufferSize to " + str(MaxBufferSize) ) clean_envelope() exit( ERR_SIZE ) # push buffer into envelope bytesCopied = 0 if ( len(Buffer) > 0 ) : try: bytesCopied = cryptPushData( Envelope, Buffer ) except CryptException as e : # catch the advisory exception, that the key is still missing status, message = e.args if status != CRYPT_ENVELOPE_RESOURCE : print( "Decryption error while pushing bytes into the envelope. " + message ) print( "Possibly inconsistent PGP message or unsupported crypto." ) clean_envelope() exit( ERR_DECRYPT ) else: # nothing to decrypt print( "Error: no valid input found." ) clean_envelope() exit( ERR_DECODE ) try: status = cryptFlushData( Envelope ) print_debug( "Flushed." ) except CryptException as e : status, message = e.args print_debug( "Flushing data ... " + message ) if (status == CRYPT_ERROR_WRONGKEY) : print( "Error: " + message ) clean_envelope() exit( ERR_WRONGKEY ) # insert the password try: cryptSetAttributeString( Envelope, CRYPT_ENVINFO_PASSWORD, password ) # randomize password buffer as it is no longer needed password = get_random_bytes( len(password) ) print_debug ( str(len(password)) + " bytes used as password" ) except CryptException as e : status, message = e.args if (status == CRYPT_ERROR_WRONGKEY) : print( "Error: " + message ) clean_envelope() exit( ERR_WRONGKEY ) # check the ALGO used try: envelope_info() except: print_debug("No ALGO info available") DataBufferSize = MaxBytes Cleartext = bytearray( b' ' * DataBufferSize ) try: bytesCopied = cryptPopData( Envelope, Cleartext, DataBufferSize ) except CryptException as e : status, message = e.args if status == CRYPT_ERROR_SIGNATURE: print( "Violated message integrity detected. Aborting decryption." ) else: print( "Decryption error: " + message ) cryptDestroyEnvelope( Envelope ) cryptEnd() exit( ERR_DECRYPT ) print_debug ( str(bytesCopied) + " decrypted bytes retrieved from envelope" ) try: # this fails, if there is no integrity protection Verification = cryptGetAttribute( Envelope, CRYPT_ENVINFO_SIGNATURE_RESULT ) if (Verification == 0) : print_debug("Full integrity protection found.") else: print_debug("No integrity protection. [" + str(Verification) + "]") except: print_debug("No integrity check available.") Cleartext = Cleartext[:bytesCopied] # Cleartext holds the decrypted data ContentType = bytearray() ContentType = cryptGetAttribute( Envelope, CRYPT_ENVINFO_CONTENTTYPE ) if (ContentType != CRYPT_CONTENT_DATA) : if (ContentType == CRYPT_CONTENT_COMPRESSEDDATA) : print_debug( "Found compressed data." ) try: BufferSize = MaxBufferSize Decompressed = bytearray( b' ' * (BufferSize) ) DeCompress_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_AUTO ) DeCompressEnvelope = int ( DeCompress_object ) cryptSetAttribute( DeCompressEnvelope, CRYPT_ATTRIBUTE_BUFFERSIZE, BufferSize ) bytesCopied = cryptPushData( DeCompressEnvelope, Cleartext ) cryptFlushData( DeCompressEnvelope ) bytesCopied = cryptPopData( DeCompressEnvelope, Decompressed, BufferSize ) print_debug("Retrieved " + str(bytesCopied) + " bytes of clear text") Cleartext = Decompressed[:bytesCopied] cryptDestroyEnvelope( DeCompressEnvelope ) if (bytesCopied >= BufferSize) : # not all clear text could be de-compressed print( "Warning: Not all clear text can be de-compressed." ) print_debug( "Warning: The clear text file has been truncated." ) else: print_debug( "Decompression successful" ) except CryptException as e : status, message = e.args print( "Decryption error: " + message ) print_debug( "Decompression of decrypted data failed." ) clean_envelope() exit ( ERR_DECRYPT ) else: print_debug("Other non-compressed data found. Aborting ...") clean_envelope() exit ( ERR_DATATYPE ) else: print_debug( "Input is too large." ) clean_envelope() exit( ERR_SIZE ) else: # USE ONLY A AES CONTEXT to decrypt data if ( Mode == "openssl" ) : # get an AEScontext from a password and salt crypt_object = cryptCreateContext( cryptUser , CRYPT_ALGO_AES ) AESContext = int ( crypt_object ) # decode the input Buffer = bytearray(b' '*len(Data)) if ( not BINARY ) : if (len(Data) < MaxBytes * 4 // 3): try: Buffer = a2b_base64( Data ) except: print ( "Error: cannot decode message block of " + str(len(Data)) + " bytes." ) clean_context() exit ( ERR_DECODE ) else: print( "Input is too large." ) clean_context() exit ( ERR_SIZE ) else: if (len(Data) < MaxBytes) : Buffer = Data else: print ( "Error: Input is too large." ) clean_context() exit ( ERR_SIZE ) print_debug ( "Processing " + str( len( Buffer ) ) + " bytes of input data" ) # read the salt from Buffer SALT = bytearray() CryptoBuffer = bytearray() if ( ( Buffer[:8] == b'Salted__' ) and (len (Buffer) > 1) ) : SALT = Buffer[8:16] i = 0 for i in range(len(Buffer)-16) : CryptoBuffer.append(Buffer[i+16]) else: print( "Decryption error: inconsistent encrypted message format." ) clean_context() exit ( ERR_CORRUPT ) keyandiv = pbkdf2( SALT ) sessionkey = keyandiv[:32] iv = keyandiv[32:] if ( KEY128 ) : sessionkey = keyandiv[:16] iv = keyandiv[16:] if (sessionkey) : print_debug( "Found session key and iv") cryptSetAttribute( AESContext, CRYPT_CTXINFO_MODE, CRYPT_MODE_CBC ) cryptSetAttribute( AESContext, CRYPT_CTXINFO_KEYSIZE, AESblocksize ) cryptSetAttributeString( AESContext, CRYPT_CTXINFO_IV, iv ) cryptSetAttributeString( AESContext, CRYPT_CTXINFO_KEY, sessionkey ) # decrypt Data in the context try: if ( (len(CryptoBuffer) % AESblocksize ) != 0 ) : print( "This input data may be corrupt" ) clean_context() exit ( ERR_CORRUPT ) cryptDecrypt( AESContext, CryptoBuffer ) # CryptoBuffer holds the decrypted clear text and must be randomized below except CryptException as e : status, message = e.args print_debug( "Decryption error while decrypting data ... " + message ) clean_context() exit( ERR_DECRYPT ) context_info() # if decryption was successful, the decrypted data is in-place cryptDestroyContext( AESContext ) print_debug ( "Retrieving " + str(len(CryptoBuffer)) + " decrypted bytes." ) # use PKCS#7 padding to remove padding bytes from the tail of the clear text Last = int( CryptoBuffer[ len(CryptoBuffer) -1 ] ) if Last > 16 : print ( "This data is not AES-256 encrypted." ) print ( "Please try the option -128 for AES encryption" ) clean_context() exit ( ERR_DECRYPT ) for X in range( Last ) : del CryptoBuffer[ len(CryptoBuffer) -1 ] Cleartext = CryptoBuffer # randomize Buffer CryptoBuffer = get_random_bytes( len (CryptoBuffer) ) # Cleartext holds the decrypted data sessionkey = get_random_bytes( len(sessionkey) ) clean_context() if (Cleartext) : # write decrypted bytes to file system try: F = open( OutFilename, "wb" ) F.write( Cleartext ) F.close() unix("chmod 600 " + OutFilename) print("Clear text written to " + OutFilename) except: print( "Error: cannot write decrypted bytes to " + OutFilename ) clean_envelope() exit( ERR_PERM ) else: # DECRYPTION failed. print( "Decryption failed." ) clean_envelope() exit ( ERR_DECRYPT ) # randomize Data Cleartext = get_random_bytes( len (Cleartext) ) # normal clean up password = get_random_bytes( len(password) ) del password try: cryptDestroyEnvelope( Envelope ) cryptEnd() except CryptException as e : status, message = e.args print("Error: " + message) exit ( ERR_INCOMPLETE ) exit( OK )