remove python sources

This commit is contained in:
Christian Zangl 2024-08-15 23:25:08 +02:00
parent 98bd3b2370
commit bb33b02b68
No known key found for this signature in database
GPG Key ID: 6D468AC36E2A4B3D
23 changed files with 1 additions and 1249 deletions

@ -1,100 +0,0 @@
name: build
on: [push]
jobs:
build:
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: prep
shell: bash
run: |
python -m pip install --upgrade pip
python -m pip install pipenv pyinstaller==6.3.0
pipenv install
rm -rf build dist
- name: build
shell: bash
run: |
echo "RUNNER_OS: $RUNNER_OS"
# get path to venv site-packages (for blake3)
pipenv run python -c "import site; print(site.getsitepackages())"
SITEPKG=$(pipenv run python -c "import site; print(site.getsitepackages()[-1])")
pipenv run pyinstaller run.py --hidden-import chkbit --hidden-import chkbit_cli --onefile --name chkbit --console --paths $SITEPKG
cat build/chkbit/warn-chkbit.txt
cd dist; ls -l
if [ "$RUNNER_OS" == "Linux" ]; then
a=$(grep -oP '(?<=version = ")[^"]+' ../pyproject.toml)
b=$(grep -oP '(?<=__version__ = ")[^"]+' ../chkbit_cli/__init__.py)
if [[ $a != $b ]]; then echo "version error $a $b"; exit 1; fi
c=$(./chkbit --version)
echo $c
if [[ $a != $c ]]; then echo "version error $a $c"; exit 1; fi
tar -czf chkbit-linux_${RUNNER_ARCH}.tar.gz chkbit
elif [ "$RUNNER_OS" == "macOS" ]; then
./chkbit --version
tar -czf chkbit-macos_${RUNNER_ARCH}.tar.gz chkbit
elif [ "$RUNNER_OS" == "Windows" ]; then
./chkbit.exe --version
7z a -tzip chkbit-windows_${RUNNER_ARCH}.zip chkbit.exe
else
echo 'unknown runner'
exit 1
fi
- name: artifact
uses: actions/upload-artifact@v4
if: runner.os == 'Linux'
with:
name: binary-${{ matrix.os }}
path: dist/chkbit*.tar.gz
- name: artifact
uses: actions/upload-artifact@v4
if: runner.os == 'macOS'
with:
name: binary-${{ matrix.os }}
path: dist/chkbit*.tar.gz
- name: artifact
uses: actions/upload-artifact@v4
if: runner.os == 'Windows'
with:
name: binary-${{ matrix.os }}
path: dist/chkbit*.zip
publish:
runs-on: ubuntu-latest
needs: build
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
steps:
- name: get-artifacts
uses: actions/download-artifact@v4
with:
path: dist
merge-multiple: true
- name: list
shell: bash
run: |
find
ls -l dist
- name: publish-release
uses: softprops/action-gh-release@v1
with:
draft: true
files: dist/*

@ -1,14 +0,0 @@
name: lint
on: [push, pull_request]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- uses: psf/black@stable

10
.gitignore vendored

@ -1,10 +0,0 @@
README.rst
build/
dist/
chkbit.spec
.cache/
.chkbit
.pytest_cache/
*.egg-info/
*.pyc
_*

10
Pipfile

@ -1,10 +0,0 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
# keep in sync with pyproject.toml
[packages]
blake3 = ">=0.4.1"
[dev-packages]

55
Pipfile.lock generated

@ -1,55 +0,0 @@
{
"_meta": {
"hash": {
"sha256": "6a7ad512a1990bc8991ddf430c56fc28969cd9656fb1e1e14fbec9fc51a9e364"
},
"pipfile-spec": 6,
"requires": {},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"blake3": {
"hashes": [
"sha256:0625c8679203d5a1d30f859696a3fd75b2f50587984690adab839ef112f4c043",
"sha256:0c3ce6142e385222f6de5312a9fb886270b7e63d9ffaa792571b03c4c83a7521",
"sha256:1a086cc9401fb0b09f9b4ba14444457d9b04a6d8086cd96b45ebf252afc49109",
"sha256:283860fe58b3a6d74e5be1ece78bbcd7de819b48476d7a534b989dd6ab49a083",
"sha256:2a08eeb324da701b212f348e91ba5d2708c0a596bd6691207f2504f4f771644c",
"sha256:34d7da38898ad4e0da7b8fe0bffb8c9d2788093ec202e01cd3ab24bc14049153",
"sha256:3cc656cab955ab6c18b587a8b4faa33930fea089981f76a7c64f33e4a26c1dac",
"sha256:46ffb411a477009dfa99a592d4408e43ff885ea7df30ed8c8f284e87866be56e",
"sha256:47316bdc9b4689601cefcc63e00a3e015cee1fa9864463be2b4f2e12473cb47f",
"sha256:4d99136b7f7c8adcee0f7484e74b159fd3ea58e7d1e94d5351f0e98d9cfc522f",
"sha256:510fd32d207ef2e28df3597847d5044117d110b0e549b2e467afa30a9f3ab7ee",
"sha256:796e65ae333831bafed5969c691ac806fe4957b6f39e52b4c3cf20f3c00c576f",
"sha256:87a9fc37260d355569f0be751e0054e0b37e6a4ec022f4b7107ffeede419dde2",
"sha256:931d1d0d4650a400838a7f1bf0d260209d10e9bd1981a6ed033f32361b96ab7b",
"sha256:9633e0198174eb77196f9f8b18d75449d86e8fa234727c98d685d5404f84eb8e",
"sha256:a51f48ec21706a22b4954fc17da72bd177d82d22ee434da0c5dc3aafeef5b8d3",
"sha256:a6c555d882117d638830b2f5f0fd9980bcd63286ad4c9959bc16b3df77042d6f",
"sha256:a73c5940454bd693d7172af8fad23019c2f5a9b910ed961c20bdf5a91babd9f2",
"sha256:a95cce3e8bfd7e717f901de80068ee4b5c77dc421f83eef00cf3eddd3ec8b87a",
"sha256:aa4989ea8f8bcfa057e50014b5b26cd8cfe0b1f06aa98d433976f45caf3a5580",
"sha256:c6c50122d9484a97a56888f09fcbbd23fdba94c4bf1e6fdeb036b17accae9f0c",
"sha256:d264ca87f0990f44985cf580b493508534dc6e72ace52a140cf725e42d602695",
"sha256:d51b3da140d04cd8b680bf2b3a5dc1f0cbb4f1e62b08e3d6f3b17d75b6285c41",
"sha256:d653623361da8db3406f4a90b39d38016f9f678e22099df0d5f8ab77efb7b4ae",
"sha256:e0fc4914750b63bbb15f71b2092a75b24a63fd86f6fbd621a8c133795f3d6371",
"sha256:ef534c59ae76faba1c7e1531930dadecaa7817e25aa6e6c825150c04ed243a3d",
"sha256:f8fa53818f1170611d781aadbcae809ecc2334679212b4a4b3feabb55deb594d",
"sha256:fb6a62ef04c5ec4dd4630615c6a22ddab16eb0b7887276b3449946c12eeb37a2",
"sha256:fb98f18bc5e218ff1134acb3b9f0e3588ad5e6f38b7279cce4559c8ae9d780e6",
"sha256:fe31163eb08fc3f82a2325e90cea88f2d7ad0265314a03de716f906b2a43be96"
],
"index": "pypi",
"version": "==0.4.1"
}
},
"develop": {}
}

@ -32,10 +32,6 @@ Remember to always maintain multiple backups for comprehensive data protection.
brew install chkbit
```
- Download for [Linux, macOS or Windows](https://github.com/laktak/chkbit-py/releases).
- Get it with [pipx](https://pipx.pypa.io/latest/installation/)
```
pipx install chkbit
```
## Usage
@ -110,7 +106,7 @@ chkbit is designed to detect "damage". To repair your files you need to think ah
Add a `.chkbitignore` file containing the names of the files/directories you wish to ignore
- each line should contain exactly one name
- you may use [Unix shell-style wildcards](https://docs.python.org/3/library/fnmatch.html)
- you may use Unix shell-style wildcards
- `*` matches everything
- `?` matches any single character
- `[seq]` matches any character in seq
@ -219,26 +215,4 @@ error: detected 1 file with damage!
`DMG` indicates damage.
## Development
With pipenv (install with `pipx install pipenv`):
```
# setup
pipenv install
# run chkbit
pipenv run python3 run.py
```
To build a source distribution package from pyproject.toml
```
pipx run build
```
You can then install your own package with
```
pipx install dist/chkbit-*.tar.gz
```
The binaries are created using pyinstaller via Github actions.

@ -1,7 +0,0 @@
from .status import Status
from .ignore import Ignore
from .input_item import InputItem
from .context import Context
from .hashfile import hashfile, hashtext
from .index import Index
from .index_thread import IndexThread

@ -1,58 +0,0 @@
from __future__ import annotations
import queue
import chkbit
from typing import Optional
from chkbit import InputItem
class Context:
def __init__(
self,
*,
num_workers=5,
force=False,
update=False,
show_ignored_only=False,
hash_algo="blake3",
skip_symlinks=False,
index_filename=".chkbit",
ignore_filename=".chkbitignore",
):
self.num_workers = num_workers
self.force = force
self.update = update
self.show_ignored_only = show_ignored_only
self.hash_algo = hash_algo
self.skip_symlinks = skip_symlinks
self.index_filename = index_filename
self.ignore_filename = ignore_filename
if not index_filename.startswith("."):
raise Exception("The index filename must start with a dot!")
if not ignore_filename.startswith("."):
raise Exception("The ignore filename must start with a dot!")
# the input queue is used to distribute the work
# to the index threads
self.input_queue = queue.Queue()
self.result_queue = queue.Queue()
self.hit_queue = queue.Queue()
if hash_algo not in ["md5", "sha512", "blake3"]:
raise Exception(f"{hash_algo} is unknown.")
def log(self, stat: chkbit.Status, path: str):
self.result_queue.put((0, stat, path))
def hit(self, *, cfiles: int = 0, cbytes: int = 0):
self.result_queue.put((1, cfiles, cbytes))
def add_input(self, path: str, *, ignore: Optional[chkbit.Ignore] = None):
self.input_queue.put(InputItem(path, ignore=ignore))
def end_input(self):
self.input_queue.put(None)
def is_chkbit_file(self, name):
return name in [self.index_filename, self.ignore_filename]

@ -1,34 +0,0 @@
import hashlib
from blake3 import blake3
from typing import Callable
BLOCKSIZE = 2**10 * 128 # kb
def hashfile(path: str, hash_algo: str, *, hit: Callable[[str], None]):
if hash_algo == "md5":
h = hashlib.md5()
elif hash_algo == "sha512":
h = hashlib.sha512()
elif hash_algo == "blake3":
h = blake3()
else:
raise Exception(f"algo '{hash_algo}' is unknown.")
with open(path, "rb") as f:
while True:
buf = f.read(BLOCKSIZE)
l = len(buf)
if l <= 0:
break
h.update(buf)
if hit:
hit(l)
return h.hexdigest()
def hashtext(text: str):
md5 = hashlib.md5()
md5.update(text.encode("utf-8"))
return md5.hexdigest()

@ -1,56 +0,0 @@
from __future__ import annotations
import fnmatch
import os
import sys
import chkbit
from enum import Enum
from typing import Optional
class Ignore:
def __init__(
self,
context: chkbit.Context,
path: str,
*,
parent_ignore: Optional[chkbit.Ignore],
):
self.parent_ignore = parent_ignore
self.context = context
self.path = path
self.name = os.path.basename(path) + "/"
self.ignore = []
self.load_ignore()
@property
def ignore_filepath(self):
return os.path.join(self.path, self.context.ignore_filename)
def load_ignore(self):
if not os.path.exists(self.ignore_filepath):
return
with open(self.ignore_filepath, "r", encoding="utf-8") as f:
text = f.read()
self.ignore = list(
filter(
lambda x: x and x[0] != "#" and len(x.strip()) > 0, text.splitlines()
)
)
def should_ignore(self, name: str, *, fullname: str = None):
for ignore in self.ignore:
if ignore.startswith("/"):
if fullname:
continue
else:
ignore = ignore[1:]
if fnmatch.fnmatch(name, ignore):
return True
if fullname and fnmatch.fnmatch(fullname, ignore):
return True
if self.parent_ignore:
return self.parent_ignore.should_ignore(
fullname or name, fullname=self.name + (fullname or name)
)
return False

@ -1,163 +0,0 @@
from __future__ import annotations
import fnmatch
import os
import subprocess
import sys
import json
import chkbit
from chkbit import hashfile, hashtext, Status
from typing import Optional
VERSION = 2 # index version
class Index:
def __init__(
self,
context: chkbit.Context,
path: str,
files: list[str],
*,
readonly: bool = False,
):
self.context = context
self.path = path
self.files = files
self.old = {}
self.new = {}
self.updates = []
self.modified = None
self.readonly = readonly
@property
def index_filepath(self):
return os.path.join(self.path, self.context.index_filename)
def _setmod(self, value=True):
self.modified = value
def _log(self, stat: Status, name: str):
self.context.log(stat, os.path.join(self.path, name))
# calc new hashes for this index
def calc_hashes(self, *, ignore: Optional[chkbit.Ignore] = None):
for name in self.files:
if ignore and ignore.should_ignore(name):
self._log(Status.IGNORE, name)
continue
a = self.context.hash_algo
# check previously used hash
if name in self.old:
old = self.old[name]
if "md5" in old:
# legacy structure
a = "md5"
self.old[name] = {"mod": old["mod"], "a": a, "h": old["md5"]}
elif "a" in old:
a = old["a"]
self.new[name] = self._calc_file(name, a)
else:
if self.readonly:
self.new[name] = self._list_file(name, a)
else:
self.new[name] = self._calc_file(name, a)
def show_ignored_only(self, ignore: chkbit.Ignore):
for name in self.files:
if ignore.should_ignore(name):
self._log(Status.IGNORE, name)
# check/update the index (old vs new)
def check_fix(self, force: bool):
for name in self.new.keys():
if not name in self.old:
self._log(Status.NEW, name)
self._setmod()
continue
a = self.old[name]
b = self.new[name]
amod = a["mod"]
bmod = b["mod"]
if a["h"] == b["h"]:
# ok, if the content stays the same the mod time does not matter
self._log(Status.OK, name)
if amod != bmod:
self._setmod()
continue
if amod == bmod:
# damage detected
self._log(Status.ERR_DMG, name)
# replace with old so we don't loose the information on the next run
# unless force is set
if not force:
self.new[name] = a
else:
self._setmod()
elif amod < bmod:
# ok, the file was updated
self._log(Status.UPDATE, name)
self._setmod()
elif amod > bmod:
self._log(Status.WARN_OLD, name)
self._setmod()
def _list_file(self, name: str, a: str):
# produce a dummy entry for new files when the index is not updated
return {
"mod": None,
"a": a,
"h": None,
}
def _calc_file(self, name: str, a: str):
path = os.path.join(self.path, name)
info = os.stat(path)
mtime = int(info.st_mtime * 1000)
res = {
"mod": mtime,
"a": a,
"h": hashfile(path, a, hit=lambda l: self.context.hit(cbytes=l)),
}
self.context.hit(cfiles=1)
return res
def save(self):
if self.modified:
if self.readonly:
raise Exception("Error trying to save a readonly index.")
data = {"v": VERSION, "idx": self.new}
text = json.dumps(self.new, separators=(",", ":"))
data["idx_hash"] = hashtext(text)
with open(self.index_filepath, "w", encoding="utf-8") as f:
json.dump(data, f, separators=(",", ":"))
self._setmod(False)
return True
else:
return False
def load(self):
if not os.path.exists(self.index_filepath):
return False
self._setmod(False)
with open(self.index_filepath, "r", encoding="utf-8") as f:
data = json.load(f)
if "data" in data:
# extract old format from js version
for item in json.loads(data["data"]):
self.old[item["name"]] = {
"mod": item["mod"],
"a": "md5",
"h": item["md5"],
}
elif "idx" in data:
self.old = data["idx"]
text = json.dumps(self.old, separators=(",", ":"))
if data.get("idx_hash") != hashtext(text):
self._setmod()
self._log(Status.ERR_IDX, self.index_filepath)
return True

@ -1,81 +0,0 @@
from __future__ import annotations
import os
import sys
import time
import threading
import chkbit
from chkbit import Index, Status, Ignore
class IndexThread:
def __init__(self, thread_no: int, context: chkbit.Context):
self.thread_no = thread_no
self.update = context.update
self.context = context
self.input_queue = context.input_queue
self.t = threading.Thread(target=self._run)
self.t.daemon = True
self.t.start()
def _process_root(self, iitem: chkbit.InputItem):
files = []
dirs = []
# load files and subdirs
for name in os.listdir(path=iitem.path):
path = os.path.join(iitem.path, name)
if name[0] == ".":
if self.context.show_ignored_only and not self.context.is_chkbit_file(
name
):
self.context.log(Status.IGNORE, path)
continue
if os.path.isdir(path):
if self.context.skip_symlinks and os.path.islink(path):
pass
else:
dirs.append(name)
elif os.path.isfile(path):
files.append(name)
# load index
index = Index(self.context, iitem.path, files, readonly=not self.update)
index.load()
# load ignore
ignore = Ignore(self.context, iitem.path, parent_ignore=iitem.ignore)
if self.context.show_ignored_only:
index.show_ignored_only(ignore)
else:
# calc the new hashes
index.calc_hashes(ignore=ignore)
# compare
index.check_fix(self.context.force)
# save if update is set
if self.update:
if index.save():
self.context.log(Status.UPDATE_INDEX, "")
# process subdirs
for name in dirs:
if not ignore.should_ignore(name):
self.context.add_input(os.path.join(iitem.path, name), ignore=ignore)
else:
self.context.log(Status.IGNORE, name + "/")
def _run(self):
while True:
iitem = self.input_queue.get()
if iitem is None:
break
try:
self._process_root(iitem)
except Exception as e:
self.context.log(Status.INTERNALEXCEPTION, f"{iitem.path}: {e}")
self.input_queue.task_done()
def join(self):
self.t.join()

@ -1,9 +0,0 @@
from __future__ import annotations
from typing import Optional
import chkbit
class InputItem:
def __init__(self, path: str, *, ignore: Optional[chkbit.Ignore] = None):
self.path = path
self.ignore = ignore

@ -1,28 +0,0 @@
from __future__ import annotations
from enum import Enum
import logging
class Status(Enum):
ERR_DMG = "DMG"
ERR_IDX = "EIX"
WARN_OLD = "old"
NEW = "new"
UPDATE = "upd"
OK = "ok "
IGNORE = "ign"
INTERNALEXCEPTION = "EXC"
UPDATE_INDEX = "iup"
@staticmethod
def get_level(status: Status):
if status == Status.INTERNALEXCEPTION:
return logging.CRITICAL
elif status in [Status.ERR_DMG, Status.ERR_IDX]:
return logging.ERROR
if status == Status.WARN_OLD:
return logging.WARNING
elif status in [Status.NEW, Status.UPDATE, Status.OK, Status.IGNORE]:
return logging.INFO
else:
return logging.DEBUG

@ -1,6 +0,0 @@
from .cli import CLI
from .progress import Progress
from .sparklines import sparkify
from .rate_calc import RateCalc
__version__ = "4.2.1"

@ -1,61 +0,0 @@
import os
import sys
class CLI:
NO_COLOR = os.environ.get("NO_COLOR", "")
class style:
reset = "\033[0m"
bold = "\033[01m"
disable = "\033[02m"
underline = "\033[04m"
reverse = "\033[07m"
strikethrough = "\033[09m"
invisible = "\033[08m"
class esc:
up = "\033[A"
down = "\033[B"
right = "\033[C"
left = "\033[D"
@staticmethod
def clear_line(opt=0):
# 0=to end, 1=from start, 2=all
return "\033[" + str(opt) + "K"
@staticmethod
def write(*text):
for t in text:
sys.stdout.write(str(t))
sys.stdout.flush()
@staticmethod
def printline(*text):
CLI.write(*text, CLI.esc.clear_line(), "\n")
# 4bit system colors
@staticmethod
def fg4(col):
# black=0,red=1,green=2,orange=3,blue=4,purple=5,cyan=6,lightgrey=7
# darkgrey=8,lightred=9,lightgreen=10,yellow=11,lightblue=12,pink=13,lightcyan=14
if CLI.NO_COLOR:
return ""
else:
return f"\033[{(30+col) if col<8 else (90-8+col)}m"
# 8bit xterm colors
@staticmethod
def fg8(col):
if CLI.NO_COLOR:
return ""
else:
return f"\033[38;5;{col}m"
@staticmethod
def bg8(col):
if CLI.NO_COLOR:
return ""
else:
return f"\033[48;5;{col}m"

@ -1,393 +0,0 @@
import argparse
import logging
import os
import queue
import shutil
import sys
import threading
import time
from datetime import datetime, timedelta
from chkbit import Context, Status, IndexThread
from . import CLI, Progress, RateCalc, sparkify, __version__
EPILOG = """
.chkbitignore rules:
each line should contain exactly one name
you may use Unix shell-style wildcards (see README)
lines starting with `#` are skipped
lines starting with `/` are only applied to the current directory
Status codes:
DMG: error, data damage detected
EIX: error, index damaged
old: warning, file replaced by an older version
new: new file
upd: file updated
ok : check ok
ign: ignored (see .chkbitignore)
EXC: internal exception
"""
UPDATE_INTERVAL = timedelta(milliseconds=700)
MB = 1024 * 1024
CLI_BG = CLI.bg8(240)
CLI_SEP = "|"
CLI_SEP_FG = CLI.fg8(235)
CLI_FG1 = CLI.fg8(255)
CLI_FG2 = CLI.fg8(228)
CLI_FG3 = CLI.fg8(202)
CLI_OK_FG = CLI.fg4(2)
CLI_ALERT_FG = CLI.fg4(1)
class Main:
def __init__(self):
self.stdscr = None
self.dmg_list = []
self.err_list = []
self.num_idx_upd = 0
self.num_new = 0
self.num_upd = 0
self.verbose = False
self.log = logging.getLogger("")
self.log_verbose = False
self.progress = Progress.Fancy
self.total = 0
self.term_width = shutil.get_terminal_size()[0]
max_stat = int((self.term_width - 70) / 2)
self.fps = RateCalc(timedelta(seconds=1), max_stat=max_stat)
self.bps = RateCalc(timedelta(seconds=1), max_stat=max_stat)
# disable
self.log.setLevel(logging.CRITICAL + 1)
def _log(self, stat: Status, path: str):
if stat == Status.UPDATE_INDEX:
self.num_idx_upd += 1
else:
if stat == Status.ERR_DMG:
self.total += 1
self.dmg_list.append(path)
elif stat == Status.INTERNALEXCEPTION:
self.err_list.append(path)
elif stat in [Status.OK, Status.UPDATE, Status.NEW]:
self.total += 1
if stat == Status.UPDATE:
self.num_upd += 1
elif stat == Status.NEW:
self.num_new += 1
lvl = Status.get_level(stat)
if self.log_verbose or not stat in [Status.OK, Status.IGNORE]:
self.log.log(lvl, f"{stat.value} {path}")
if self.verbose or not stat in [Status.OK, Status.IGNORE]:
CLI.printline(
CLI_ALERT_FG if lvl >= logging.WARNING else "",
stat.value,
" ",
path,
CLI.style.reset,
)
def _res_worker(self, context: Context):
last = datetime.now()
while True:
try:
item = self.result_queue.get(timeout=0.2)
now = datetime.now()
if not item:
if self.progress == Progress.Fancy:
CLI.printline("")
break
t, *p = item
if t == 0:
self._log(*p)
last = datetime.min
else:
self.fps.push(now, p[0])
self.bps.push(now, p[1])
self.result_queue.task_done()
except queue.Empty:
now = datetime.now()
pass
if last + UPDATE_INTERVAL < now:
last = now
if self.progress == Progress.Fancy:
stat_f = f"{self.fps.last} files/s"
stat_b = f"{int(self.bps.last/MB)} MB/s"
stat = f"[{'RW' if context.update else 'RO'}:{context.num_workers}] {self.total:>5} files $ {sparkify(self.fps.stats)} {stat_f:13} $ {sparkify(self.bps.stats)} {stat_b}"
stat = stat[: self.term_width - 1]
stat = stat.replace("$", CLI_SEP_FG + CLI_SEP + CLI_FG2, 1)
stat = stat.replace("$", CLI_SEP_FG + CLI_SEP + CLI_FG3, 1)
CLI.write(
CLI_BG,
CLI_FG1,
stat,
CLI.esc.clear_line(),
CLI.style.reset,
"\r",
)
elif self.progress == Progress.Plain:
print(self.total, end="\r")
def process(self, args):
if args.update and args.show_ignored_only:
print("Error: use either --update or --show-ignored-only!", file=sys.stderr)
return None
context = Context(
num_workers=args.workers,
force=args.force,
update=args.update,
show_ignored_only=args.show_ignored_only,
hash_algo=args.algo,
skip_symlinks=args.skip_symlinks,
index_filename=args.index_name,
ignore_filename=args.ignore_name,
)
self.result_queue = context.result_queue
# put the initial paths into the queue
for path in args.paths:
context.add_input(path)
# start indexing
workers = [IndexThread(i, context) for i in range(context.num_workers)]
# log the results from the workers
res_worker = threading.Thread(target=self._res_worker, args=(context,))
res_worker.daemon = True
res_worker.start()
# wait for work to finish
context.input_queue.join()
# signal workers to exit
for worker in workers:
context.end_input()
# signal res_worker to exit
self.result_queue.put(None)
for worker in workers:
worker.join()
res_worker.join()
return context
def print_result(self, context):
def cprint(col, text):
if self.progress == Progress.Fancy:
CLI.printline(col, text, CLI.style.reset)
else:
print(text)
def eprint(col, text):
if self.progress == Progress.Fancy:
CLI.write(col)
print(text, file=sys.stderr)
CLI.write(CLI.style.reset)
else:
print(text, file=sys.stderr)
iunit = lambda x, u: f"{x} {u}{'s' if x!=1 else ''}"
iunit2 = lambda x, u1, u2: f"{x} {u2 if x!=1 else u1}"
if self.progress != Progress.Quiet:
status = f"Processed {iunit(self.total, 'file')}{' in readonly mode' if not context.update else ''}."
cprint(CLI_OK_FG, status)
self.log.info(status)
if self.progress == Progress.Fancy and self.total > 0:
elapsed = datetime.now() - self.fps.start
elapsed_s = elapsed.total_seconds()
print(f"- {str(elapsed).split('.')[0]} elapsed")
print(
f"- {(self.fps.total+self.fps.current)/elapsed_s:.2f} files/second"
)
print(
f"- {(self.bps.total+self.bps.current)/MB/elapsed_s:.2f} MB/second"
)
if context.update:
if self.num_idx_upd:
cprint(
CLI_OK_FG,
f"- {iunit2(self.num_idx_upd, 'directory was', 'directories were')} updated\n"
+ f"- {iunit2(self.num_new, 'file hash was', 'file hashes were')} added\n"
+ f"- {iunit2(self.num_upd, 'file hash was', 'file hashes were')} updated",
)
elif self.num_new + self.num_upd > 0:
cprint(
CLI_ALERT_FG,
f"No changes were made (specify -u to update):\n"
+ f"- {iunit(self.num_new, 'file')} would have been added and\n"
+ f"- {iunit(self.num_upd, 'file')} would have been updated.",
)
if self.dmg_list:
eprint(CLI_ALERT_FG, "chkbit detected damage in these files:")
for err in self.dmg_list:
print(err, file=sys.stderr)
n = len(self.dmg_list)
status = f"error: detected {iunit(n, 'file')} with damage!"
self.log.error(status)
eprint(CLI_ALERT_FG, status)
if self.err_list:
status = "chkbit ran into errors"
self.log.error(status + "!")
eprint(CLI_ALERT_FG, status + ":")
for err in self.err_list:
print(err, file=sys.stderr)
if self.dmg_list or self.err_list:
sys.exit(1)
def run(self):
parser = argparse.ArgumentParser(
prog="chkbit",
description="Checks the data integrity of your files. See https://github.com/laktak/chkbit-py",
epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"paths", metavar="PATH", type=str, nargs="*", help="directories to check"
)
parser.add_argument(
"-u",
"--update",
action="store_true",
help="update indices (without this chkbit will verify files in readonly mode)",
)
parser.add_argument(
"--show-ignored-only", action="store_true", help="only show ignored files"
)
parser.add_argument(
"--algo",
type=str,
default="blake3",
help="hash algorithm: md5, sha512, blake3 (default: blake3)",
)
parser.add_argument(
"-f", "--force", action="store_true", help="force update of damaged items"
)
parser.add_argument(
"-s", "--skip-symlinks", action="store_true", help="do not follow symlinks"
)
parser.add_argument(
"-l",
"--log-file",
metavar="FILE",
type=str,
help="write to a logfile if specified",
)
parser.add_argument(
"--log-verbose", action="store_true", help="verbose logging"
)
parser.add_argument(
"--index-name",
metavar="NAME",
type=str,
default=".chkbit",
help="filename where chkbit stores its hashes, needs to start with '.' (default: .chkbit)",
)
parser.add_argument(
"--ignore-name",
metavar="NAME",
type=str,
default=".chkbitignore",
help="filename that chkbit reads its ignore list from, needs to start with '.' (default: .chkbitignore)",
)
parser.add_argument(
"-w",
"--workers",
metavar="N",
action="store",
type=int,
default=5,
help="number of workers to use (default: 5)",
)
parser.add_argument(
"--plain",
action="store_true",
help="show plain status instead of being fancy",
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="quiet, don't show progress/information",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="verbose output"
)
parser.add_argument(
"-V", "--version", action="store_true", help="show version information"
)
args = parser.parse_args()
if args.version:
print(__version__)
return
self.verbose = args.verbose or args.show_ignored_only
if args.log_file:
self.log_verbose = args.log_verbose
self.log.setLevel(logging.INFO)
fh = logging.FileHandler(args.log_file)
fh.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname).4s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
self.log.addHandler(fh)
if args.quiet:
self.progress = Progress.Quiet
elif not sys.stdout.isatty():
self.progress = Progress.Summary
elif args.plain:
self.progress = Progress.Plain
if args.paths:
self.log.info(f"chkbit {', '.join(args.paths)}")
context = self.process(args)
if context and not context.show_ignored_only:
self.print_result(context)
else:
parser.print_help()
def main():
try:
Main().run()
except KeyboardInterrupt:
print("abort")
sys.exit(1)
except Exception as e:
print(e, file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

@ -1,8 +0,0 @@
from enum import Enum
class Progress(Enum):
Quiet = (0,)
Summary = (1,)
Plain = (2,)
Fancy = (3,)

@ -1,28 +0,0 @@
from datetime import datetime, timedelta
class RateCalc:
def __init__(self, interval: timedelta, max_stat: int):
self.interval = interval
self.max_stat = max(max_stat, 10)
self.reset()
def reset(self):
self.start = datetime.now()
self.updated = self.start
self.total = 0
self.current = 0
self.stats = [0] * self.max_stat
@property
def last(self):
return self.stats[-1]
def push(self, ts: datetime, value: int):
while self.updated + self.interval < ts:
self.stats.append(self.current)
self.stats = self.stats[-self.max_stat :]
self.total += self.current
self.current = 0
self.updated += self.interval
self.current += value

@ -1,71 +0,0 @@
import math, os, re, string, sys
"""
Copyright (c) 2021, Brandon Whaley <redkrieg@gmail.com>, et al.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
spark_chars = "▁▂▃▄▅▆▇█"
"""Eight unicode characters of (nearly) steadily increasing height."""
def sparkify(series, minimum=None, maximum=None, rows=1):
"""Converts <series> to a sparkline string.
Example:
>>> sparkify([ 0.5, 1.2, 3.5, 7.3, 8.0, 12.5, float("nan"), 15.0, 14.2, 11.8, 6.1,
... 1.9 ])
u'▁▁▂▄▅▇ ██▆▄▂'
>>> sparkify([1, 1, -2, 3, -5, 8, -13])
u'▆▆▅▆▄█▁'
Raises ValueError if input data cannot be converted to float.
Raises TypeError if series is not an iterable.
"""
series = [float(n) for n in series]
if all(not math.isfinite(n) for n in series):
return " " * len(series)
minimum = min(filter(math.isfinite, series)) if minimum is None else minimum
maximum = max(filter(math.isfinite, series)) if maximum is None else maximum
data_range = maximum - minimum
if data_range == 0.0:
# Graph a baseline if every input value is equal.
return "".join([spark_chars[0] if math.isfinite(i) else " " for i in series])
row_res = len(spark_chars)
resolution = row_res * rows
coefficient = (resolution - 1.0) / data_range
def clamp(n):
return min(max(n, minimum), maximum)
def spark_index(n):
"""An integer from 0 to (resolution-1) proportional to the data range"""
return int(round((clamp(n) - minimum) * coefficient))
output = []
for r in range(rows - 1, -1, -1):
row_out = []
row_min = row_res * r
row_max = row_min + row_res - 1
for n in series:
if not math.isfinite(n):
row_out.append(" ")
continue
i = spark_index(n)
if i < row_min:
row_out.append(" ")
elif i > row_max:
row_out.append(spark_chars[-1])
else:
row_out.append(spark_chars[i % row_res])
output.append("".join(row_out))
return os.linesep.join(output)

@ -1,25 +0,0 @@
[project]
name = "chkbit"
# because of packaging issues we keep this here and in chkbit_cli/__init__.py
version = "4.2.1"
description = "chkbit checks the data integrity of your files"
authors = [
{name = "Christian Zangl", email = "laktak@cdak.net"},
]
# keep in sync with Pipfile
dependencies = [
"blake3>=0.4.1",
]
requires-python = ">=3.7.0"
readme = "README.md"
license = {file = "LICENSE"}
[project.urls]
Homepage = "https://github.com/laktak/chkbit-py"
[project.scripts]
chkbit = "chkbit_cli.main:main"
[tool.setuptools.packages.find]
include = ["chkbit","chkbit_cli"]

@ -1 +0,0 @@
blake3>=0.3.4

4
run.py

@ -1,4 +0,0 @@
from chkbit_cli.main import main
if __name__ == "__main__":
main()