add delete functionality
This commit is contained in:
parent
b42ddcc0e0
commit
2e67c1200a
|
@ -134,6 +134,33 @@ class RestoreThread(QThread):
|
|||
_process_json_error(self.json_err)
|
||||
|
||||
|
||||
class DeleteThread(QThread):
|
||||
"""A lass to restore a backup with borg.
|
||||
|
||||
Args:
|
||||
archive_name (str) the name of the archive to restore.
|
||||
"""
|
||||
def __init__(self, archive_name):
|
||||
super().__init__()
|
||||
self.archive_name = archive_name
|
||||
|
||||
def stop(self):
|
||||
self.p.kill()
|
||||
self.json_err = None
|
||||
|
||||
def run(self):
|
||||
self.p = subprocess.Popen(['borg', 'delete', '--log-json',
|
||||
('::'
|
||||
+ self.archive_name)],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
encoding='utf8')
|
||||
self.json_output, self.json_err = self.p.communicate()
|
||||
self.p.wait()
|
||||
_process_json_error(self.json_err)
|
||||
|
||||
|
||||
def backup(includes, excludes=None, prefix=None):
|
||||
thread = BackupThread(includes, excludes=excludes, prefix=prefix)
|
||||
thread.run()
|
||||
|
|
|
@ -49,6 +49,7 @@ class MainWindow(QMainWindow):
|
|||
self.action_settings.triggered.connect(self.show_settings)
|
||||
self.action_backup.triggered.connect(self.create_backup)
|
||||
self.action_restore.triggered.connect(self.restore_backup)
|
||||
self.action_delete.triggered.connect(self.delete_backup)
|
||||
|
||||
def start(self):
|
||||
"""This method is intendet to be used only once at the application
|
||||
|
@ -88,25 +89,36 @@ class MainWindow(QMainWindow):
|
|||
dialog = ProgressDialog(thread)
|
||||
dialog.label_info.setText("creating a backup.")
|
||||
dialog.exec_()
|
||||
self.update_archives()
|
||||
self.update_repository_stats()
|
||||
self.update_ui()
|
||||
except BorgException as e:
|
||||
show_error(e)
|
||||
|
||||
def _get_target_path(self):
|
||||
"""Opens a file dialog and returns the opened path."""
|
||||
dlg = QFileDialog
|
||||
dlg.DirectoryOnly
|
||||
folder_name = str(dlg.getExistingDirectory(
|
||||
self, "Select Directory", os.getenv('HOME')))
|
||||
return folder_name
|
||||
|
||||
@property
|
||||
def selected_archive(self):
|
||||
return self.list_archive.currentItem().text()
|
||||
|
||||
def restore_backup(self):
|
||||
"""Restores a selected backup to the given path."""
|
||||
target_path = self._get_target_path()
|
||||
if target_path:
|
||||
archive_name = self.list_archive.currentItem().text()
|
||||
restore_path = os.path.join(target_path, archive_name)
|
||||
try:
|
||||
archive_name = self.selected_archive
|
||||
target_path = self._get_target_path()
|
||||
except AttributeError:
|
||||
error = BorgException("Please create a backup first.")
|
||||
archive_name = None
|
||||
target_path = None
|
||||
show_error(error)
|
||||
|
||||
if target_path and archive_name:
|
||||
try:
|
||||
restore_path = os.path.join(target_path, archive_name)
|
||||
thread = borg.RestoreThread(archive_name, restore_path)
|
||||
dialog = ProgressDialog(thread)
|
||||
dialog.label_info.setText("restoring a backup.")
|
||||
|
@ -115,6 +127,25 @@ class MainWindow(QMainWindow):
|
|||
except BorgException as e:
|
||||
show_error(e)
|
||||
|
||||
def delete_backup(self):
|
||||
"""Deletes the selected archive from the repository."""
|
||||
try:
|
||||
archive_name = self.selected_archive
|
||||
except AttributeError:
|
||||
error = BorgException("Please create a backup first.")
|
||||
archive_name = None
|
||||
show_error(error)
|
||||
|
||||
if archive_name:
|
||||
try:
|
||||
thread = borg.DeleteThread(archive_name)
|
||||
dialog = ProgressDialog(thread)
|
||||
dialog.label_info.setText("deleting a backup.")
|
||||
dialog.exec_()
|
||||
self.update_ui()
|
||||
except BorgException as e:
|
||||
show_error(e)
|
||||
|
||||
def _update_archives(self):
|
||||
"""Lists all the archive names in the UI."""
|
||||
self.list_archive.clear()
|
||||
|
@ -123,10 +154,11 @@ class MainWindow(QMainWindow):
|
|||
archive_names.append(archive['name'])
|
||||
self.list_archive.addItems(archive_names)
|
||||
|
||||
def update_archives(self):
|
||||
def update_ui(self):
|
||||
"""Lists all the archive names in the UI."""
|
||||
try:
|
||||
self._update_archives()
|
||||
self._update_repository_stats()
|
||||
except BorgException as e:
|
||||
show_error(e)
|
||||
|
||||
|
@ -143,9 +175,3 @@ class MainWindow(QMainWindow):
|
|||
self.label_repo_deduplicated_size.setText(
|
||||
"Deduplicated Size: "
|
||||
+ convert_size(stats['unique_csize']))
|
||||
|
||||
def update_repository_stats(self):
|
||||
try:
|
||||
self._update_repository_stats()
|
||||
except BorgException as e:
|
||||
show_error(e)
|
||||
|
|
|
@ -2,35 +2,20 @@ import os
|
|||
import sys
|
||||
import subprocess
|
||||
from time import strftime
|
||||
import shutil
|
||||
from unittest.mock import MagicMock
|
||||
from unittest import TestCase
|
||||
|
||||
from PyQt5.QtWidgets import QApplication
|
||||
|
||||
import context
|
||||
from testcase import BorgQtTestCase
|
||||
from testcase import BorgInterfaceTest
|
||||
import borg_interface as borg
|
||||
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
|
||||
|
||||
class BorgQtBackupTestCase(BorgQtTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.repository_path = '/tmp/test-borgqt'
|
||||
os.environ['BORG_REPO'] = self.repository_path
|
||||
os.environ['BORG_PASSPHRASE'] = 'foo'
|
||||
os.environ['BORG_DISPLAY_PASSPHRASE'] = 'no'
|
||||
subprocess.run(['borg', 'init',
|
||||
'--encryption=repokey-blake2'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.repository_path):
|
||||
shutil.rmtree(self.repository_path)
|
||||
|
||||
class BackupTestCase(BorgInterfaceTest):
|
||||
def test_backup(self):
|
||||
borg.backup(['.'])
|
||||
output = subprocess.check_output(['borg', 'list'], encoding='utf8')
|
||||
|
@ -42,29 +27,31 @@ class BorgQtBackupTestCase(BorgQtTestCase):
|
|||
self.assertNotEqual(-1, output.find(strftime('test_%Y-%m-%d_%H:')))
|
||||
|
||||
|
||||
class BorgQtRestoreTestCase(BorgQtTestCase):
|
||||
class RestoreTestCase(BorgInterfaceTest):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.repository_path = '/tmp/test-borgqt'
|
||||
os.environ['BORG_REPO'] = self.repository_path
|
||||
os.environ['BORG_PASSPHRASE'] = 'foo'
|
||||
os.environ['BORG_DISPLAY_PASSPHRASE'] = 'no'
|
||||
subprocess.run(['borg', 'init',
|
||||
'--encryption=repokey-blake2'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
borg.backup(['.'])
|
||||
self.form.update_archives()
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.repository_path):
|
||||
shutil.rmtree(self.repository_path)
|
||||
|
||||
def test_restore(self):
|
||||
archive_name = self.form.list_archive.item(0).text()
|
||||
repo_archives = borg.get_archives()
|
||||
archive_name = repo_archives[0]['name']
|
||||
target_path = '/tmp/restore/'
|
||||
restore_path = os.path.join(target_path, archive_name)
|
||||
thread = borg.RestoreThread(archive_name, restore_path)
|
||||
thread.run()
|
||||
self.assertTrue(os.path.exists(
|
||||
os.path.join(restore_path, os.path.realpath(__file__))))
|
||||
|
||||
|
||||
class DeleteTestCase(BorgInterfaceTest):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
borg.backup(['.'])
|
||||
|
||||
def test_delete(self):
|
||||
repo_archives = borg.get_archives()
|
||||
archive_name = repo_archives[0]['name']
|
||||
thread = borg.DeleteThread(archive_name)
|
||||
thread.run()
|
||||
repo_archives = borg.get_archives()
|
||||
self.assertEqual(repo_archives, [])
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
import os
|
||||
import subprocess
|
||||
import shutil
|
||||
|
||||
import unittest
|
||||
import warnings
|
||||
|
@ -20,3 +22,20 @@ class BorgQtTestCase(unittest.TestCase):
|
|||
self.dir_path = os.path.dirname(os.path.realpath(__file__))
|
||||
self.config_path = os.path.join(self.dir_path,
|
||||
'../docs/borg_qt.conf.example')
|
||||
|
||||
|
||||
class BorgInterfaceTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.repository_path = '/tmp/test-borgqt'
|
||||
os.environ['BORG_REPO'] = self.repository_path
|
||||
os.environ['BORG_PASSPHRASE'] = 'foo'
|
||||
os.environ['BORG_DISPLAY_PASSPHRASE'] = 'no'
|
||||
subprocess.run(['borg', 'init',
|
||||
'--encryption=repokey-blake2'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.repository_path):
|
||||
shutil.rmtree(self.repository_path)
|
||||
|
|
Loading…
Reference in New Issue