Compare commits
4 commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
48cd7869b2 | ||
![]() |
f85db5191e | ||
![]() |
0db6c387bb | ||
![]() |
376271f79d |
7 changed files with 581 additions and 0 deletions
188
dirf/Config.py
Normal file
188
dirf/Config.py
Normal file
|
@ -0,0 +1,188 @@
|
|||
__all__ = [ "Config" ]
|
||||
|
||||
from typing import Final, Literal, TextIO, cast
|
||||
import os.path
|
||||
from portalocker import Lock
|
||||
from portalocker.exceptions import AlreadyLocked
|
||||
|
||||
from .mytypes import *
|
||||
|
||||
|
||||
class Config:
|
||||
|
||||
|
||||
_file:Final[TextIO]
|
||||
_source:Final[str]
|
||||
_dest:Final[str]
|
||||
_paths_to_ignore:Final[list[PathAndType]]
|
||||
_paths_to_sync:Final[list[PathAndType]]
|
||||
_paths_added_to_ignore:list[PathAndType]
|
||||
_paths_added_to_sync:list[PathAndType]
|
||||
def __init__(self, filename:str, *,
|
||||
source:str|None, dest:str|None,
|
||||
) -> None:
|
||||
try:
|
||||
self._file = cast(TextIO,
|
||||
Lock(filename, "r").acquire(fail_when_locked=True)
|
||||
)
|
||||
except AlreadyLocked:
|
||||
raise Exception(f"the config file '{filename}' is already used.")
|
||||
lines = self._file.readlines()
|
||||
|
||||
self._paths_to_ignore = []
|
||||
self._paths_to_sync = []
|
||||
|
||||
config_source:tuple[int,str]|None = None
|
||||
config_dest:tuple[int,str]|None = None
|
||||
|
||||
current_category:list[PathAndType]|None = None
|
||||
|
||||
for i in range(len(lines)):
|
||||
line = lines[i].rstrip()
|
||||
head = line.split()[0]
|
||||
body = line[len(head):].lstrip()
|
||||
|
||||
if line == "":
|
||||
continue
|
||||
|
||||
elif line.startswith("#"):
|
||||
continue
|
||||
|
||||
elif head == "SOURCE":
|
||||
if config_source is not None:
|
||||
raise Exception(f"line {i+1}: the config already "
|
||||
+ f"defined SOURCE at line {config_source[0]+1}.")
|
||||
if os.path.isabs(body):
|
||||
raise Exception(f"line {i+1}: "
|
||||
+ "this ain't an absolute path.")
|
||||
config_source = i, body
|
||||
|
||||
elif head == "DEST":
|
||||
if config_dest is not None:
|
||||
raise Exception(f"line {i+1}: the config already "
|
||||
+ f"defined DEST at line {config_dest[0]+1}.")
|
||||
body = line[4:].lstrip()
|
||||
if os.path.isabs(body):
|
||||
raise Exception(f"line {i+1}: "
|
||||
+ "this ain't an absolute path.")
|
||||
config_dest = i, body
|
||||
|
||||
elif line == "[IGNORE]":
|
||||
current_category = self._paths_to_ignore
|
||||
|
||||
elif line == "[SYNC]":
|
||||
current_category = self._paths_to_sync
|
||||
|
||||
elif head in ["d", "f", "l"]:
|
||||
head = cast(Literal["d","f","l"], head)
|
||||
if current_category is None:
|
||||
raise Exception(f"line {i+1}: "
|
||||
+ "the path is defined before its category name.")
|
||||
if os.path.isabs(body):
|
||||
raise Exception(f"line {i+1}: "
|
||||
+ "this must be an absolute path.")
|
||||
if any([
|
||||
body in [v[1] for v in l]
|
||||
for l in [self._paths_to_sync, self._paths_to_ignore]
|
||||
]):
|
||||
raise Exception(f"line {i+1}: "
|
||||
+ "this path was already used.")
|
||||
current_category.append( (head, body) )
|
||||
|
||||
else:
|
||||
raise Exception(f"line {i+1}: what does that mean")
|
||||
|
||||
if dest is None and config_dest is None:
|
||||
raise Exception("'dest' is unknown.")
|
||||
if (dest is not None and config_dest is not None
|
||||
and dest != config_dest[1]
|
||||
):
|
||||
raise Exception("the 'dest' given to Config() is different "
|
||||
+ "than in the config file.")
|
||||
self._dest = config_dest[1] if config_dest is not None else (
|
||||
cast(str, dest)
|
||||
)
|
||||
|
||||
if source is None and config_source is None:
|
||||
raise Exception("'source' is unknown.")
|
||||
if (source is not None and config_source is not None
|
||||
and source != config_source[1]
|
||||
):
|
||||
raise Exception("the 'source' given to Config() is different "
|
||||
+ "than in the config file.")
|
||||
self._source = config_source[1] if config_source is not None else (
|
||||
cast(str, source)
|
||||
)
|
||||
|
||||
self._paths_added_to_ignore = []
|
||||
self._paths_added_to_sync = []
|
||||
|
||||
|
||||
def toIgnore(self,
|
||||
path:str, pathtype:PathType
|
||||
) -> bool|Literal["different_types"]:
|
||||
if path in [
|
||||
path[1] for path in self._paths_to_ignore
|
||||
if pathtype != path[0]
|
||||
]:
|
||||
return "different_types"
|
||||
return path in [ path[1] for path in self._paths_to_ignore ]
|
||||
|
||||
|
||||
def toSync(self,
|
||||
path:str, pathtype:PathType
|
||||
) -> bool|Literal["different_types"]:
|
||||
if path in [
|
||||
path[1] for path in self._paths_to_sync
|
||||
if pathtype != path[0]
|
||||
]:
|
||||
return "different_types"
|
||||
return path in [ path[1] for path in self._paths_to_sync ]
|
||||
|
||||
|
||||
def flush(self) -> None:
|
||||
if not self._file.writable():
|
||||
return
|
||||
raise NotImplemented
|
||||
|
||||
|
||||
def addToIgnore(self,
|
||||
path:str, pathtype:PathType
|
||||
) -> bool|Literal["different_types"]:
|
||||
if not self._file.writable():
|
||||
raise Exception("the file is read-only.")
|
||||
if not self.toIgnore(path, pathtype) is False:
|
||||
if self.toIgnore(path, pathtype) == "different_types":
|
||||
return "different_types"
|
||||
return False
|
||||
self._paths_to_ignore.append( (pathtype, path) )
|
||||
self._paths_added_to_ignore.append( (pathtype, path) )
|
||||
return True
|
||||
|
||||
|
||||
def addToSync(self,
|
||||
path:str, pathtype:PathType
|
||||
) -> bool|Literal["different_types"]:
|
||||
if not self._file.writable():
|
||||
raise Exception("the file is read-only.")
|
||||
if not self.toSync(path, pathtype) is False:
|
||||
if self.toSync(path, pathtype) == "different_types":
|
||||
return "different_types"
|
||||
return False
|
||||
self._paths_to_sync.append( (pathtype, path) )
|
||||
self._paths_added_to_sync.append( (pathtype, path) )
|
||||
return True
|
||||
|
||||
def __enter__(self) -> "Config":
|
||||
return self
|
||||
|
||||
def __exit__(self, *_) -> None:
|
||||
self.flush()
|
||||
|
||||
def __del__(self) -> None:
|
||||
self.flush()
|
||||
|
||||
def getSource(self) -> str:
|
||||
return self._source
|
||||
def getDest(self) -> str:
|
||||
return self._dest
|
0
dirf/__init__.py
Normal file
0
dirf/__init__.py
Normal file
10
dirf/__main__.py
Normal file
10
dirf/__main__.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
import argparse
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
232
dirf/diff.py
Normal file
232
dirf/diff.py
Normal file
|
@ -0,0 +1,232 @@
|
|||
__add__ = [ "diff" ]
|
||||
|
||||
from collections.abc import Set
|
||||
from os import listdir, readlink
|
||||
from os.path import abspath
|
||||
from typing import Callable, Final, assert_never
|
||||
|
||||
from .mytypes import *
|
||||
from .Config import *
|
||||
from .utils import *
|
||||
from .whatsthispath import *
|
||||
from .utils import *
|
||||
|
||||
|
||||
class DiffResult:
|
||||
"""
|
||||
(see 'diff'.)
|
||||
"""
|
||||
|
||||
|
||||
type RelPath = Path
|
||||
"""
|
||||
represents a path relative to the root of the two compared directories.
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, first:AbsPath, second:AbsPath, subpath:RelPath) -> None:
|
||||
"""
|
||||
(please create me using 'diff'.)
|
||||
"""
|
||||
|
||||
self.first:Final[AbsPath] = first
|
||||
"""
|
||||
path of the first directory compared.
|
||||
"""
|
||||
|
||||
self.second:Final[AbsPath] = second
|
||||
"""
|
||||
path of the second directory compared.
|
||||
"""
|
||||
|
||||
self.subpath:Final[DiffResult.RelPath] = subpath
|
||||
"""
|
||||
self contains info from this directory.
|
||||
"""
|
||||
|
||||
self.differs:set[NameAndType] = set()
|
||||
"""
|
||||
list the children of self.subpath that have a different content
|
||||
(or link target for a symbolic link) in both directories.
|
||||
"""
|
||||
|
||||
self.same:set[NameAndType] = set()
|
||||
"""
|
||||
list the children of self.subpath that have the same content
|
||||
(or link target for a symbolic link) in both directories.
|
||||
"""
|
||||
|
||||
self.differentTypes:set[tuple[Filename,PathType,PathType]] = set()
|
||||
"""
|
||||
list the children of self.subpath that have a different PathType
|
||||
in both directories.
|
||||
|
||||
self.differentTypes[1] is the PathType of the one
|
||||
in the firstdirectory, self.differentTypes[2] is the PathType
|
||||
of the one in the second directory.
|
||||
"""
|
||||
|
||||
self.presentOnlyInFirst:set[NameAndType] = set()
|
||||
"""
|
||||
list the children of self.subpath that are present
|
||||
only in the first directory.
|
||||
"""
|
||||
|
||||
self.presentOnlyInSecond:set[NameAndType] = set()
|
||||
"""
|
||||
list the children of self.subpath that are present
|
||||
only in the second directory.
|
||||
"""
|
||||
|
||||
self.weird:set[tuple[Filename,PathTypeW|None,PathTypeW|None]] = set()
|
||||
"""
|
||||
list the children that have an unknown type in at least one
|
||||
of the two directories. They are ignored whe comparing two directories.
|
||||
|
||||
self.weird[1] is the type of the one
|
||||
in the firstdirectory, self.weird[2] is the type
|
||||
of the one in the second directory. 'None' means the file doesn't exist
|
||||
in the directory.
|
||||
"""
|
||||
|
||||
self.subresults:dict[Filename,DiffResult] = {}
|
||||
"""
|
||||
for any directory in self.differs there is a corresponding DiffResult.
|
||||
"""
|
||||
|
||||
|
||||
def print(self) -> None:
|
||||
"""
|
||||
print a complete representation of self.
|
||||
"""
|
||||
|
||||
def printWith[T](label:str, what:set[T], f:Callable[[T],str]):
|
||||
if len(what) == 0:
|
||||
return
|
||||
print(label + ":")
|
||||
for v in what:
|
||||
print(f"-\t{f(v)}")
|
||||
|
||||
title(f"comparaison of the two directories")
|
||||
title(f"- {self.first}")
|
||||
title(f"- {self.second}")
|
||||
if self.subpath != "/":
|
||||
title(f"in the subdirectory {repr(self.subpath)}")
|
||||
print()
|
||||
|
||||
printWith("same", self.same, reprFileAndType)
|
||||
|
||||
printWith("different", self.differs, reprFileAndType)
|
||||
|
||||
printWith("different types", self.differentTypes,
|
||||
lambda v: f"{reprPathType(v[1])} / {reprPathType(v[2])}: {v[0]}"
|
||||
)
|
||||
|
||||
printWith(f"exists only in {self.first}",
|
||||
self.presentOnlyInFirst, reprFileAndType
|
||||
)
|
||||
|
||||
printWith(f"exists only in {self.second}",
|
||||
self.presentOnlyInSecond, reprFileAndType
|
||||
)
|
||||
|
||||
printWith("unknown types", self.weird,
|
||||
lambda v: f"{reprPathType(v[1])} / {reprPathType(v[2])}: {v[0]}"
|
||||
)
|
||||
|
||||
|
||||
def diff(pathA:str, pathB:str, *, ignored:Set[PathAndType]) -> DiffResult:
|
||||
r = _diffDirs(abspath(pathA), abspath(pathB), ignored, "/", trust=False)
|
||||
return r
|
||||
|
||||
|
||||
def _diffDirs(
|
||||
pathA:AbsPath, pathB:AbsPath, ignored:Set[PathAndType],
|
||||
currentPath:DiffResult.RelPath, *, trust:bool
|
||||
) -> DiffResult:
|
||||
|
||||
currentPathA = pathA + "/" + currentPath
|
||||
currentPathB = pathB + "/" + currentPath
|
||||
currentPathA_type = whatsthispath(currentPathA)
|
||||
currentPathB_type = whatsthispath(currentPathB)
|
||||
|
||||
if not trust:
|
||||
assert pathA[0] == "/", "'pathA' shall be an absolute path."
|
||||
assert pathB[0] == "/", "'pathB' shall be an absolute path."
|
||||
assert currentPath[0] == "/", (
|
||||
"'currentPath' shall start with '/'."
|
||||
)
|
||||
assert all([ v[1][0] == "/" for v in ignored ]), (
|
||||
"'ignoredPaths' shall contain only absolute paths."
|
||||
)
|
||||
assert currentPathA_type == 'd', "'pathA' shall be a directory."
|
||||
assert currentPathB_type == 'd', "'pathB' shall be a directory."
|
||||
|
||||
r = DiffResult(pathA, pathB, currentPath)
|
||||
|
||||
for name in set(listdir(currentPathA)) | set(listdir(currentPathB)):
|
||||
newPath:DiffResult.RelPath = currentPath + "/" + name
|
||||
newPathA:AbsPath = currentPathA + "/" + name
|
||||
newPathB:AbsPath = currentPathB + "/" + name
|
||||
newPathA_type = whatsthispath(newPathA)
|
||||
newPathB_type = whatsthispath(newPathB)
|
||||
if newPathA_type == "w" or newPathB_type == "w":
|
||||
# file is weird
|
||||
r.weird.add(
|
||||
(name, newPathA_type, newPathB_type)
|
||||
)
|
||||
elif newPathA_type is None:
|
||||
# file doesnt exist in first
|
||||
assert newPathB_type is not None
|
||||
r.presentOnlyInSecond.add((newPathB_type, name))
|
||||
elif newPathB_type is None:
|
||||
# file doesnt exist in second
|
||||
r.presentOnlyInFirst.add((newPathA_type, name))
|
||||
elif newPathA_type != newPathB_type:
|
||||
# file types are different
|
||||
r.differentTypes.add(
|
||||
(name, newPathA_type, newPathB_type)
|
||||
)
|
||||
else:
|
||||
# file type is the same
|
||||
_diffOne(r,
|
||||
pathA, pathB, ignored, newPath, name,
|
||||
newPathA_type
|
||||
)
|
||||
|
||||
return r
|
||||
|
||||
|
||||
def _diffOne(r:DiffResult, pathA:AbsPath, pathB:AbsPath,
|
||||
ignored:Set[PathAndType], path:DiffResult.RelPath, filename:str,
|
||||
what:PathType
|
||||
) -> None:
|
||||
match what:
|
||||
case "d":
|
||||
# files are directories
|
||||
rr = _diffDirs(pathA, pathB, ignored, path, trust=False) # TODO(debug) add trust=True
|
||||
r.subresults[filename] = rr
|
||||
if (len(rr.differentTypes) > 0
|
||||
or len(rr.presentOnlyInFirst) > 0
|
||||
or len(rr.presentOnlyInSecond) > 0
|
||||
or len(rr.differs) > 0
|
||||
):
|
||||
r.differs.add( (what, filename) )
|
||||
else:
|
||||
r.same.add( ( what, filename ) )
|
||||
|
||||
case "f"|"l":
|
||||
def getContent(path:str) -> str|bytes:
|
||||
nonlocal what
|
||||
if what == "l":
|
||||
return readlink(path)
|
||||
return open(path, "rb").read()
|
||||
(
|
||||
r.same if (
|
||||
getContent(pathA + "/" + path)
|
||||
== getContent(pathB + "/" + path)
|
||||
) else r.differs
|
||||
).add( (what, filename) )
|
||||
|
||||
case _:
|
||||
assert_never(what)
|
104
dirf/mytypes.py
Normal file
104
dirf/mytypes.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
__all__ = [
|
||||
"PathType",
|
||||
"PathTypeW",
|
||||
"Filename",
|
||||
"Path",
|
||||
"AbsPath",
|
||||
"NameAndType",
|
||||
"NameAndTypeW",
|
||||
"reprPathType",
|
||||
"reprFileAndType",
|
||||
"PathAndType",
|
||||
]
|
||||
|
||||
from typing import Literal
|
||||
|
||||
|
||||
type PathType = Literal["d","f","l"]
|
||||
"""
|
||||
All the types that a known element can have:
|
||||
- 'd': directory
|
||||
- 'f': file
|
||||
- 'l': symbolic link
|
||||
"""
|
||||
|
||||
type PathTypeW = PathType|Literal["w"]
|
||||
"""
|
||||
All the types that an element can have.
|
||||
To use when the type of an element can be not in PathType.
|
||||
|
||||
Superset of PathType.
|
||||
|
||||
'w' (weird) means the type is unknown.
|
||||
"""
|
||||
|
||||
|
||||
##
|
||||
## These types are just aliases to str, but they allow to define
|
||||
## what a variable can contain.
|
||||
##
|
||||
|
||||
type Filename = str
|
||||
"""
|
||||
represents the name of a file, a directory or whatever else.
|
||||
|
||||
can't contain any '/' obviously.
|
||||
"""
|
||||
|
||||
type Path = str
|
||||
"""
|
||||
represents any path.
|
||||
|
||||
must start with '/'.
|
||||
"""
|
||||
|
||||
type AbsPath = Path
|
||||
"""
|
||||
represents a Path relative to the linux root.
|
||||
"""
|
||||
|
||||
|
||||
type NameAndType = tuple[PathType,Filename]
|
||||
"""
|
||||
a Filename and its associated PathType.
|
||||
"""
|
||||
|
||||
type NameAndTypeW = tuple[PathTypeW,Filename]
|
||||
"""
|
||||
a Filename and its associated PathTypeW.
|
||||
"""
|
||||
|
||||
type PathAndType = tuple[PathType,Filename]
|
||||
"""
|
||||
a Path and its associated PathType.
|
||||
"""
|
||||
|
||||
type PathAndTypeW = tuple[PathTypeW,Filename]
|
||||
"""
|
||||
a Path and its associated PathType.
|
||||
"""
|
||||
|
||||
|
||||
def reprPathType(t:PathTypeW|None) -> str:
|
||||
"""
|
||||
return a common name in natural language of the given type.
|
||||
|
||||
t == None means the file doesn't exist.
|
||||
"""
|
||||
match t:
|
||||
case "d":
|
||||
return "directory"
|
||||
case "f":
|
||||
return "file"
|
||||
case "l":
|
||||
return "symbolic link"
|
||||
case "w":
|
||||
return "unknown type"
|
||||
case None:
|
||||
return "(doesn't exist)"
|
||||
|
||||
def reprFileAndType(v:NameAndTypeW|PathAndTypeW) -> str:
|
||||
"""
|
||||
equivalent to repr() for a NameAndTypeW.
|
||||
"""
|
||||
return f"{reprPathType(v[0])}: {v[1]}"
|
25
dirf/utils.py
Normal file
25
dirf/utils.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
__all__ = [
|
||||
"title",
|
||||
"simplifyPath",
|
||||
"mapSet",
|
||||
]
|
||||
|
||||
from typing import Callable
|
||||
|
||||
|
||||
def title(txt:str) -> None:
|
||||
return print(f"\033[1m{txt}\033[0m")
|
||||
|
||||
|
||||
def simplifyPath(path:str) -> str:
|
||||
start = "/" if path[0] == "/" else ""
|
||||
return start + "/".join(
|
||||
[ el for el in path.split("/") if el != "" ]
|
||||
)
|
||||
|
||||
|
||||
def mapSet[T](s:set[T], f:Callable[[T],T]) -> None:
|
||||
s2:set[T] = set()
|
||||
while len(s) > 0:
|
||||
s2.add(f(s.pop()))
|
||||
s.update(s2)
|
22
dirf/whatsthispath.py
Normal file
22
dirf/whatsthispath.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
__all__ = [ "whatsthispath" ]
|
||||
|
||||
import os.path
|
||||
|
||||
from .mytypes import *
|
||||
|
||||
|
||||
def whatsthispath(path:AbsPath) -> PathTypeW|None:
|
||||
"""
|
||||
get the PathTypeW of a file.
|
||||
|
||||
if the file doesn't exist, returns None.
|
||||
"""
|
||||
if os.path.islink(path):
|
||||
return "l"
|
||||
if os.path.isdir(path):
|
||||
return "d"
|
||||
if os.path.isfile(path):
|
||||
return "f"
|
||||
if os.path.exists(path):
|
||||
return "w"
|
||||
return None
|
Loading…
Add table
Reference in a new issue