对接 discord 的 zlib 数据流,它是分段传输的,只有第一个片段包含头部信息,后续片段不包含头部信息。解压后面的片段需要依赖第一个段的 header
python 里很好实现
deobj = zlib.decompressobj()
然后只要用 deobj 按顺序解压每个片段就行了
deobj.decompress(数据流)
改写到 go 里,发现 zlib.NewReader()只能用于第一个片段,后续片段无法解压(会报 invalid header 错误) 感觉 go 的 zlib 没有提供一个能保存上下文状态的对象,每次解压都要从头开始,导致后续不完整的 zlib 流无法解压
reader := bytes.NewReader(bin1)
zlib.NewReader(reader)
reader = bytes.NewReader(bin2)
zlib.NewReader(reader) // 会报错
binmerge := append(bin1, bin2...)
reader = bytes.NewReader(binmerge)
zlib.NewReader(reader) // 正常
如果把 bin1 bin2 合并起来是可以解压的,但显然这样不合理,内存和资源占用会越来越大
所以我想只依赖 bin1 的 header 信息,然后用 bin2 的数据流来解压
除此之外还尝试过讲 bin1 的头 N 个字节放到 bin2 前面,也没成功,似乎 header 不是单纯拷贝字节那么简单,可能涉及到其他计算
问过 gpt ,没能解决。
搜过 google 等各种资料,也没有解决。
不熟悉 zlib 格式,想问问各位有没有办法实现
2 个对应的测试数据我打包放在这里了 https://drive.google.com/file/d/1zdAbgWVWewqcovaHxRq3ZhPObPnr3m-5/view?usp=sharing
1
MoYi123 319 天前
你可以写一个比较复杂的 reader 吧
用一个 queue 来实现 reader, Read(p []byte) (n int, err error) 可以知道已经读了多少, 可以释放队列头部已使用的压缩数据 到 zlib.NewReader 读不出数据的时候, 再往 queue 里添加新的压缩数据 没试, 应该是可以的. |
3
boboliu 318 天前
r := io.MultiReader(bin1, bin2, bin3)
zlib.NewReader(r) |
5
lance6716 318 天前 via Android
按照你的描述,它本身就是“一个”流,你直接把流的 reader 传过去就行啊,为啥要拆成“两个”bin1 bin2 呢
|
6
rekulas OP @lance6716 并不是我要拆开,而且平台给的数据就是这样的
比如第 5 秒的时候,平台给我一个流 1, 然后继续处理任务 第 50 秒,平台发给我流 2,通知我任务状态 我要流式处理,肯定得这样的, js python 里面都很简单, go 居然没能成功 |
9
bv 318 天前
是这样吗?
input := new(bytes.Buffer) output, _ := zlib.NewReader(input) // 压缩的数据流往 input 里面写入。 // 从 output 读取解压后的数据流。 |
10
guonaihong 318 天前
把 chan 包装成一个 io.Reader, 收数据的地方直接并发 chan , 读的地方 select chan 就行。
type myReader struct { c chan []byte } func (m *myReader) Read(p []byte) (n int, err error) { copy() } |
11
guonaihong 318 天前
忽略我上一个回答,直接用 io.Pipe 。然后 zlib 解决套下 io.Pipe 的 reader 对象。另外收 gzip 数据的地方并发写就行。
https://pkg.go.dev/io#Pipe |
12
rekulas OP @guonaihong pipe 我也试过没成功 ,也可能用法不对 空了我再试试
|
13
bv 318 天前
和 zlib 无关,只是流式解析没处理好而已。
package main import ( "bytes" "compress/zlib" "fmt" "io" "os" "sync" ) //goland:noinspection GoUnhandledErrorResult func main() { bin1, _ := os.Open("1.bin") defer bin1.Close() bin2, _ := os.Open("2.bin") defer bin2.Close() input := new(bytes.Buffer) input.ReadFrom(bin1) zr, err := zlib.NewReader(input) if err != nil { fmt.Printf("zlib error: %v\n", err) return } wg := new(sync.WaitGroup) wg.Add(1) go func() { defer wg.Done() io.Copy(os.Stderr, zr) zr.Close() fmt.Printf("\noutput over\n") }() input.ReadFrom(bin2) wg.Wait() fmt.Printf("main over\n") } 输出结果: {"t":null,"s":null,"op":10,"d":{"heartbeat_interval":41250,"_trace":["[\"gateway-prd-us-east1-c-0bwh\",{\"micros\":0.0}]"]}}{"t":null,"s":null,"op":11,"d":null} output over main over |
14
bv 318 天前
如果是 HTTP 客户端可以改造的更简单一些,例如:
func Get(u string) (*http.Response, error) { resp, err := http.Get(u) if err != nil { return nil, err } encoding := resp.Header.Get("Content-Encoding") if encoding == "deflate" { // 代表 body 使用了 zlib 压缩 body := resp.Body rc, exx := zlib.NewReader(body) if exx != nil { _ = body.Close() return nil, exx } resp.Body = rc } return resp, nil } |
15
guonaihong 318 天前
@rekulas 有一个简单的方法验证, 如果对端传过来的 gzip 包,都缓存到 bytes.Buffer ,完毕可以解出来。那就说明你的 io.Pipe 的用法不对。
|
17
bv 318 天前
我大概理解了你的想法:就好比分卷压缩,只要得到第一个压缩包(第一个压缩包内含有元数据),跳过任意个块包,也照样可以解压后面的任何一块压缩包。
这应该实现不了吧,块与块之间大概率是存在依赖关系的,环环相扣,一环缺失就会导致后面数据无效。不太了解 zlib 的二进制格式,OP 要想深入研究可自行查阅资料。 比如:ts (MPEG2-TS) 这种分块格式是经过设计的,每一块内都含有元数据,不依赖前后 ts 数据块,每一块都单独可用。 还有:压缩包只是个容器,里面的数据才是有用的,一段数据被分块压缩后,怎么知道想要的数据被压缩分块到了哪个块区?这也是一个问题。 |
18
rekulas OP @bv 有一点点小区别 并不是跳过任意块包,只要按顺序 1,2,3,4....依次解压即可
在 py 和 js 中确实是可以实现的, 只是 go 里面我不清楚怎么实现, 感觉它的 zlib 接口还比较简陋 |
19
lesismal 295 天前
上接: https://www.v2ex.com/t/1024087#reply11
package main import ( "bytes" "compress/zlib" "fmt" "os" ) func main() { bin1, _ := os.Open("1.bin") defer bin1.Close() bin2, _ := os.Open("2.bin") defer bin2.Close() input := new(bytes.Buffer) input.ReadFrom(bin1) zr, err := zlib.NewReader(input) if err != nil { fmt.Printf("zlib error: %v\n", err) return } defer zr.Close() buf := make([]byte, 1024) n1, err := zr.Read(buf) fmt.Println("read 1 over:", n1, err) fmt.Println("buf 1:", string(buf[:n1])) input.ReadFrom(bin2) n2, err := zr.Read(buf[n1:]) fmt.Println("read 2 over:", n2, err) fmt.Println("buf 2:", string(buf[n1:n1+n2])) } output: read 1 over: 124 <nil> buf 1: {"t":null,"s":null,"op":10,"d":{"heartbeat_interval":41250,"_trace":["[\"gateway-prd-us-east1-c-0bwh\",{\"micros\":0.0}]"]}} read 2 over: 36 <nil> buf 2: {"t":null,"s":null,"op":11,"d":null} |
20
lesismal 295 天前
BTW ,OP 自己的 python 代码里用的就是同一个 deobj = zlib.decompressobj(),go 里用了不同的 zlib reader 读取两个片段、第二个片段没有 header 、当然就出错了
|
22
body007 295 天前
测试没问题,就像你用 python 一样,只需要创建一个 zlib.NewReader ,使用 io.Pipe 就可以了。
```go package main import ( "compress/zlib" "errors" "io" "os" ) func main() { err := test() if err != nil { panic(err) } } func test() error { var ( ir, iw = io.Pipe() done = errors.New("done") ) go func() { list := []string{"1.bin", "2.bin"} for _, f := range list { fr, err := os.Open(f) if err != nil { iw.CloseWithError(err) return } _, err = io.Copy(iw, fr) if err1 := fr.Close(); err == nil { err = err1 } if err != nil { iw.CloseWithError(err) return } } iw.CloseWithError(done) }() fw, err := os.Create("dst.txt") if err != nil { return err } defer fw.Close() zr, err := zlib.NewReader(ir) if err != nil { return err } _, err = io.Copy(fw, zr) if err1 := zr.Close(); err == nil { err = err1 } if errors.Is(done, err) { err = nil } return err } ``` |
23
rekulas OP |
24
lesismal 295 天前
Welcome!
|