add chunk stitching; twice as fast long-distance uploads:

rather than sending each file chunk as a separate HTTP request,
sibling chunks will now be fused together into larger HTTP POSTs
which results in unreasonably huge speed boosts on some routes
( `2.6x` from Norway to US-East,  `1.6x` from US-West to Finland )

the `x-up2k-hash` request header now takes a comma-separated list
of chunk hashes, which must all be sibling chunks, resulting in
one large consecutive range of file data as the post body

a new global-option `--u2sz`, default `1,64,96`, sets the target
request size as 64 MiB, allowing the settings ui to specify any
value between 1 and 96 MiB, which is cloudflare's max value

this does not cause any issues for resumable uploads; thanks to the
streaming HTTP POST parser, each chunk will be verified and written
to disk as they arrive, meaning only the untransmitted chunks will
have to be resent in the event of a connection drop -- of course
assuming there are no misconfigured WAFs or caching-proxies

the previous up2k approach of uploading each chunk in a separate HTTP
POST was inefficient in many real-world scenarios, mainly due to TCP
window-scaling behaving erratically in some IXPs / along some routes

a particular link from Norway to Virginia,US is unusably slow for
the first 4 MiB, only reaching optimal speeds after 100 MiB, and
then immediately resets the scale when the request has been sent;
connection reuse does not help in this case

on this route, the basic-uploader was somehow faster than up2k
with 6 parallel uploads; only time i've seen this
This commit is contained in:
ed 2024-07-21 23:35:37 +00:00
parent e565ad5f55
commit 132a83501e
9 changed files with 221 additions and 119 deletions

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3
from __future__ import print_function, unicode_literals
S_VERSION = "1.18"
S_BUILD_DT = "2024-06-01"
S_VERSION = "1.19"
S_BUILD_DT = "2024-07-21"
"""
u2c.py: upload to copyparty
@ -119,6 +119,7 @@ class File(object):
self.nhs = 0
# set by upload
self.nojoin = 0 # type: int
self.up_b = 0 # type: int
self.up_c = 0 # type: int
self.cd = 0
@ -130,10 +131,20 @@ class File(object):
class FileSlice(object):
"""file-like object providing a fixed window into a file"""
def __init__(self, file, cid):
def __init__(self, file, cids):
# type: (File, str) -> None
self.car, self.len = file.kchunks[cid]
self.file = file
self.cids = cids
self.car, tlen = file.kchunks[cids[0]]
for cid in cids[1:]:
ofs, clen = file.kchunks[cid]
if ofs != self.car + tlen:
raise Exception(9)
tlen += clen
self.len = tlen
self.cdr = self.car + self.len
self.ofs = 0 # type: int
self.f = open(file.abs, "rb", 512 * 1024)
@ -636,13 +647,13 @@ def handshake(ar, file, search):
return r["hash"], r["sprs"]
def upload(file, cid, pw, stats):
# type: (File, str, str, str) -> None
"""upload one specific chunk, `cid` (a chunk-hash)"""
def upload(fsl, pw, stats):
# type: (FileSlice, str, str) -> None
"""upload a range of file data, defined by one or more `cid` (chunk-hash)"""
headers = {
"X-Up2k-Hash": cid,
"X-Up2k-Wark": file.wark,
"X-Up2k-Hash": ",".join(fsl.cids),
"X-Up2k-Wark": fsl.file.wark,
"Content-Type": "application/octet-stream",
}
@ -652,15 +663,20 @@ def upload(file, cid, pw, stats):
if pw:
headers["Cookie"] = "=".join(["cppwd", pw])
f = FileSlice(file, cid)
try:
r = req_ses.post(file.url, headers=headers, data=f)
r = req_ses.post(fsl.file.url, headers=headers, data=fsl)
if r.status_code == 400:
txt = r.text
if "already got that" in txt or "already being written" in txt:
fsl.file.nojoin = 1
if not r:
raise Exception(repr(r))
_ = r.content
finally:
f.f.close()
fsl.f.close()
class Ctl(object):
@ -743,7 +759,7 @@ class Ctl(object):
self.mutex = threading.Lock()
self.q_handshake = Queue() # type: Queue[File]
self.q_upload = Queue() # type: Queue[tuple[File, str]]
self.q_upload = Queue() # type: Queue[FileSlice]
self.st_hash = [None, "(idle, starting...)"] # type: tuple[File, int]
self.st_up = [None, "(idle, starting...)"] # type: tuple[File, int]
@ -788,7 +804,8 @@ class Ctl(object):
for nc, cid in enumerate(hs):
print(" {0} up {1}".format(ncs - nc, cid))
stats = "{0}/0/0/{1}".format(nf, self.nfiles - nf)
upload(file, cid, self.ar.a, stats)
fslice = FileSlice(file, [cid])
upload(fslice, self.ar.a, stats)
print(" ok!")
if file.recheck:
@ -1062,13 +1079,24 @@ class Ctl(object):
if not hs:
kw = "uploaded" if file.up_b else " found"
print("{0} {1}".format(kw, upath))
for cid in hs:
self.q_upload.put([file, cid])
cs = hs[:]
while cs:
fsl = FileSlice(file, cs[:1])
try:
if file.nojoin:
raise Exception()
for n in range(2, self.ar.sz + 1):
fsl = FileSlice(file, cs[:n])
except:
pass
cs = cs[len(fsl.cids):]
self.q_upload.put(fsl)
def uploader(self):
while True:
task = self.q_upload.get()
if not task:
fsl = self.q_upload.get()
if not fsl:
self.st_up = [None, "(finished)"]
break
@ -1086,22 +1114,23 @@ class Ctl(object):
self.eta,
)
file, cid = task
file = fsl.file
cids = fsl.cids
try:
upload(file, cid, self.ar.a, stats)
upload(fsl, self.ar.a, stats)
except Exception as ex:
t = "upload failed, retrying: {0} #{1} ({2})\n"
eprint(t.format(file.name, cid[:8], ex))
t = "upload failed, retrying: %s #%d+%d (%s)\n"
eprint(t % (file.name, cids[0][:8], len(cids) - 1, ex))
file.cd = time.time() + self.ar.cd
# handshake will fix it
with self.mutex:
sz = file.kchunks[cid][1]
file.ucids = [x for x in file.ucids if x != cid]
sz = fsl.len
file.ucids = [x for x in file.ucids if x not in cids]
if not file.ucids:
self.q_handshake.put(file)
self.st_up = [file, cid]
self.st_up = [file, cids[0]]
file.up_b += sz
self.up_b += sz
self.up_br += sz
@ -1164,6 +1193,7 @@ source file/folder selection uses rsync syntax, meaning that:
ap = app.add_argument_group("performance tweaks")
ap.add_argument("-j", type=int, metavar="CONNS", default=2, help="parallel connections")
ap.add_argument("-J", type=int, metavar="CORES", default=hcores, help="num cpu-cores to use for hashing; set 0 or 1 for single-core hashing")
ap.add_argument("--sz", type=int, metavar="MiB", default=64, help="try to make each POST this big")
ap.add_argument("-nh", action="store_true", help="disable hashing while uploading")
ap.add_argument("-ns", action="store_true", help="no status panel (for slow consoles and macos)")
ap.add_argument("--cd", type=float, metavar="SEC", default=5, help="delay before reattempting a failed handshake/upload")