#!/usr/bin/env python3 # group: rw # # Test case QMP's encrypted key management # # Copyright (C) 2019 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # import iotests import os import time import json test_img = os.path.join(iotests.test_dir, 'test.img') class Secret: def __init__(self, index): self._id = "keysec" + str(index) # you are not supposed to see the password... self._secret = "hunter" + str(index) def id(self): return self._id def secret(self): return self._secret def to_cmdline_object(self): return [ "secret,id=" + self._id + ",data=" + self._secret] def to_qmp_object(self): return { "qom_type" : "secret", "id": self.id(), "data": self.secret() } ################################################################################ class EncryptionSetupTestCase(iotests.QMPTestCase): # test case startup def setUp(self): # start the VM self.vm = iotests.VM() self.vm.launch() # create the secrets and load 'em into the VM self.secrets = [ Secret(i) for i in range(0, 6) ] for secret in self.secrets: result = self.vm.qmp("object-add", **secret.to_qmp_object()) self.assert_qmp(result, 'return', {}) if iotests.imgfmt == "qcow2": self.pfx = "encrypt." self.img_opts = [ '-o', "encrypt.format=luks" ] else: self.pfx = "" self.img_opts = [] # test case shutdown def tearDown(self): # stop the VM self.vm.shutdown() ########################################################################### # create the encrypted block device def createImg(self, file, secret): iotests.qemu_img( 'create', '--object', *secret.to_cmdline_object(), '-f', iotests.imgfmt, '-o', self.pfx + 'key-secret=' + secret.id(), '-o', self.pfx + 'iter-time=10', *self.img_opts, file, '1M') ########################################################################### # open an encrypted block device def openImageQmp(self, id, file, secret, read_only = False): encrypt_options = { 'key-secret' : secret.id() } if iotests.imgfmt == "qcow2": encrypt_options = { 'encrypt': { 'format':'luks', **encrypt_options } } result = self.vm.qmp('blockdev-add', ** { 'driver': iotests.imgfmt, 'node-name': id, 'read-only': read_only, **encrypt_options, 'file': { 'driver': 'file', 'filename': test_img, } } ) self.assert_qmp(result, 'return', {}) # close the encrypted block device def closeImageQmp(self, id): result = self.vm.qmp('blockdev-del', **{ 'node-name': id }) self.assert_qmp(result, 'return', {}) ########################################################################### # add a key to an encrypted block device def addKeyQmp(self, id, new_secret, secret = None, slot = None, force = False): crypt_options = { 'state' : 'active', 'new-secret' : new_secret.id(), 'iter-time' : 10 } if slot != None: crypt_options['keyslot'] = slot if secret != None: crypt_options['secret'] = secret.id() if iotests.imgfmt == "qcow2": crypt_options['format'] = 'luks' crypt_options = { 'encrypt': crypt_options } args = { 'node-name': id, 'job-id' : 'job_add_key', 'options' : { 'driver' : iotests.imgfmt, **crypt_options }, } if force == True: args['force'] = True #TODO: check what jobs return result = self.vm.qmp('x-blockdev-amend', **args) assert result['return'] == {} self.vm.run_job('job_add_key') # erase a key from an encrypted block device def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False): crypt_options = { 'state' : 'inactive', } if slot != None: crypt_options['keyslot'] = slot if old_secret != None: crypt_options['old-secret'] = old_secret.id() if iotests.imgfmt == "qcow2": crypt_options['format'] = 'luks' crypt_options = { 'encrypt': crypt_options } args = { 'node-name': id, 'job-id' : 'job_erase_key', 'options' : { 'driver' : iotests.imgfmt, **crypt_options }, } if force == True: args['force'] = True result = self.vm.qmp('x-blockdev-amend', **args) assert result['return'] == {} self.vm.run_job('job_erase_key') ########################################################################### # create image, and change its key def testChangeKey(self): # create the image with secret0 and open it self.createImg(test_img, self.secrets[0]); self.openImageQmp("testdev", test_img, self.secrets[0]) # add key to slot 1 self.addKeyQmp("testdev", new_secret = self.secrets[1]) # add key to slot 5 self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5) # erase key from slot 0 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) #reopen the image with secret1 self.closeImageQmp("testdev") self.openImageQmp("testdev", test_img, self.secrets[1]) # close and erase the image for good self.closeImageQmp("testdev") os.remove(test_img) # test that if we erase the old password, # we can still change the encryption keys using 'old-secret' def testOldPassword(self): # create the image with secret0 and open it self.createImg(test_img, self.secrets[0]); self.openImageQmp("testdev", test_img, self.secrets[0]) # add key to slot 1 self.addKeyQmp("testdev", new_secret = self.secrets[1]) # erase key from slot 0 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) # this will fail as the old password is no longer valid self.addKeyQmp("testdev", new_secret = self.secrets[2]) # this will work self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1]) # close and erase the image for good self.closeImageQmp("testdev") os.remove(test_img) def testUseForceLuke(self): self.createImg(test_img, self.secrets[0]); self.openImageQmp("testdev", test_img, self.secrets[0]) # Add bunch of secrets self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4) self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2) # overwrite an active secret self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2) self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True) self.addKeyQmp("testdev", new_secret = self.secrets[0]) # Now erase all the secrets self.eraseKeyQmp("testdev", old_secret = self.secrets[5]) self.eraseKeyQmp("testdev", slot=4) # erase last keyslot self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True) self.closeImageQmp("testdev") os.remove(test_img) if __name__ == '__main__': iotests.verify_working_luks() # Encrypted formats support iotests.activate_logging() iotests.main(supported_fmts = ['qcow2', 'luks'])