0x01前情提要

目前在做一个非结构化数据解析项目,非结构化数据中占比最大的主要是PDF格式。

在解析过程中会使用pymupdf对PDF中的文本和图像做初步解析

image_H3PcvOipfe.png

api会返回一个类似上图结构的字典,

层级大概是doc→page→block→line→span→char

我们的解析逻辑代码会根据pymupdf给出的初步解析dict进行分析,生成一个中间态的dict,

但是这个中间态的dict因为包含了太多的过程数据,导致大小膨胀的十分可怕,在通常情况下这个dict是pdf源文件的10倍左右,在极端情况下可以达到30倍大小。

image_ZMeS4KBSnG.png

image_EpDjJJ8JWC.png

目前批量任务是通过spark运行的,1000w任务在切片30w task时,每task约30余条pdf,在起先多次测试爆掉内存后(从初始的10core40G到5core100G),自爆自弃的配置了1core40G的豪华配置,只想先把任务跑起来,但是还是出现了jvm虚拟机被爆了的情况

24/01/14 04:31:12 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[stdout writer for /share/dataproc/envs/py3.10/bin/python,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
  at java.lang.StringCoding.encode(StringCoding.java:350)
  at java.lang.String.getBytes(String.java:941)
  at org.apache.spark.unsafe.types.UTF8String.fromString(UTF8String.java:139)
  at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$$nestedInanonfun$makeFromJava$11$1.applyOrElse(EvaluatePython.scala:149)
  at org.apache.spark.sql.execution.python.EvaluatePython$.nullSafeConvert(EvaluatePython.scala:213)
  at org.apache.spark.sql.execution.python.EvaluatePython$.$anonfun$makeFromJava$11(EvaluatePython.scala:148)
  at org.apache.spark.sql.execution.python.EvaluatePython$$$Lambda$851/617702869.apply(Unknown Source)
  at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$$nestedInanonfun$makeFromJava$16$1.applyOrElse(EvaluatePython.scala:195)
  at org.apache.spark.sql.execution.python.EvaluatePython$.nullSafeConvert(EvaluatePython.scala:213)
  at org.apache.spark.sql.execution.python.EvaluatePython$.$anonfun$makeFromJava$16(EvaluatePython.scala:182)
  at org.apache.spark.sql.execution.python.EvaluatePython$$$Lambda$929/208188683.apply(Unknown Source)
  at org.apache.spark.sql.SparkSession.$anonfun$applySchemaToPythonRDD$2(SparkSession.scala:802)
  at org.apache.spark.sql.SparkSession$$Lambda$930/21820570.apply(Unknown Source)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
  at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:89)
  at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:80)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
  at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
  at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:734)
  at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
  at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$841/158147681.apply(Unknown Source)
  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
  at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)

通过监控发现,单个task在极限状态下占用内存达到了28G,虽然没有超出40G的分配上限,但还是被爆掉了,问了gpt才明白,java存储string是通过byte数组实现的,这个数组最大长度是int类型的最大值即Integer.MAX_VALUE ,也就是2^31 - 1,这个大小约等效2G大小的文本文件。

复测了导致java.lang.OutOfMemoryError的PDF文件,在本地通过python程序可以正常解析,解析后的中间态dict一个是900M+,一个是1.5G左右,考虑到单个task存在30余条pdf,因此爆掉java的string也不是那么让人意外了。

0x02 解决方案选型

这种情况下,很难在一个executor配置更多的core增加任务的并行程度,

和其他的开发同学讨论过,中间态dict太大了,可不可以不落地,但是哒咩~

这种问题怎么能难道聪明的myhloli酱呢,总有办法解决超大字典的落地问题的,

网上搜了一下json压缩,有人提到两个方案cjson和json.hpack(jsonh),但是经过测试这俩方案都有局限性,不太适合逻辑结构异常复制的json进行压缩

随手在explorer中右键点了下压缩,没想到效果意外的还不错

image_W4xqW4hI5h.png
image_RkeffA2kkO.png

但是桌面压缩的速度有点慢到离谱了 360压缩的问题,windows自带的压缩还是挺快的

试着直接在代码中使用gzip压缩

image_QbI34It1-4.png

gzip默认是用level9的,914m压缩到了107m,耗时18s,看上去感觉还可以,

讨论后,初步决定使用无损压缩方案直接对json字符串进行压缩后存储base64文本的方案。

0x03 压缩方案选型

既然选用了压缩方案,科学的实施过程是先选择一款优秀的压缩算法再进行下一步

一款优秀的压缩算法应有的特性包含两点:

1、压缩比高,也就是压缩完成之后的文件要尽可能小

2、速度快,在相同压缩比的前提下,压缩速度要尽可能快

gzip的优势是python库内置,无需引入第三方依赖,python同时也内置了lzma库,

这两个库各有特点,gzip速度稍快,压缩率稍低,lzma速度较慢,但是压缩率更高

除了内置的这两款压缩库以外,另外也通过调研,补充两个现代的、广泛使用的压缩库参与对比。

Brotli:

https://github.com/google/brotli

Brotli是Google开发的一种无损压缩算法,最初设计用于HTTP内容编码。Brotli在压缩效率和速度之间提供了一种良好的平衡,特别是在处理文本和HTML内容时。它提供了不同级别的压缩选项,允许用户在压缩速度和压缩效果之间做出选择。此外,由于Brotli的广泛支持,它可以用于各种应用中,包括Web服务器和浏览器。

Zstandard:

https://github.com/facebook/zstd

Zstandard是由Facebook开发的一种无损压缩算法,设计用于实现高压缩比和高速度。Zstd提供了一种广泛的压缩级别选择,允许用户在压缩速度和压缩效果之间做出权衡。与Brotli相比,Zstd通常能够提供更快的压缩和解压缩速度,尤其是在处理大型数据集时。

0x04 代码编写

现在AI发展的这么快,简单的测试代码已经难不倒AI了,作为一个AI领域工程师,代码还要自己写那可太out了,全部交给AI来搞定

import gzip
import time
import brotli
import zstandard
import lzma
import json
import os
import base64

# 定义压缩函数
def compress_with_method(input_str, output_file, compression_method, **kwargs):
    start = time.time()
    compressed = compression_method(input_str.encode(), **kwargs)
    with open(output_file, 'wb') as f_out:
        f_out.write(base64.b64encode(compressed))
    end = time.time()
    compressed_size = os.path.getsize(output_file)
    original_size = len(input_str.encode())
    compression_ratio = compressed_size / original_size
    return end - start, compression_ratio

def compress_with_gzip(input_str, output_file, compresslevel):
    return compress_with_method(input_str, output_file, gzip.compress, compresslevel=compresslevel)

def compress_with_brotli(input_str, output_file, quality):
    return compress_with_method(input_str, output_file, brotli.compress, quality=quality)

def compress_with_zstandard(input_str, output_file, level):
    cctx = zstandard.ZstdCompressor(level=level)
    return compress_with_method(input_str, output_file, cctx.compress)

def compress_with_lzma(input_str, output_file, preset):
    return compress_with_method(input_str, output_file, lzma.compress, preset=preset)

# 定义测试函数
def test_compression(compression_name, compression_func, param_range, file, json_str, results):
    for param in param_range:
        output_file = f'test_{compression_name}_{param}_{file}.compressed'
        elapsed_time, compression_ratio = compression_func(json_str, output_file, param)
        print(f"{compression_name} level {param} for {file}:")
        print(f"\tCompressed size: {os.path.getsize(output_file) / 1024 / 1024} MB")
        print(f"\tCompression time: {elapsed_time} seconds")
        print(f"\tCompression ratio: {compression_ratio}")
        results.append([file, f'{compression_name}_{param}', elapsed_time, os.path.getsize(output_file), compression_ratio])

# 定义文件列表
files = ["simple1.json", "simple2.json"]

results = []
# 对每个文件进行压缩
for file in files:
    # 将测试数据转换为 JSON 字符串
    with open(file, 'r', encoding='utf-8') as f_in:
        json_str = json.dumps(f_in.read())
        # 计算原始数据大小
        original_size = len(json_str.encode('utf-8'))

    # 测试 brotli 的 quality(0~11)
    test_compression('brotli', compress_with_brotli, range(0, 12), file, json_str, results)

    # 测试 zstandard 的 level(-5~22)
    test_compression('zstandard', compress_with_zstandard, range(-5, 23), file, json_str, results)

    # 测试 gzip 的 level(0-9)
    test_compression('gzip', compress_with_gzip, range(0, 10), file, json_str, results)

    # 测试 lzma 的 level(0-9)
    test_compression('lzma', compress_with_lzma, range(0, 10), file, json_str, results)

# 导入 csv 模块
import csv

# 用 'w' 模式打开一个名为 'compression_results.csv' 的文件
with open('compression_results.csv', 'w', encoding='utf-8', newline='') as f_out:
    # 创建一个 csv 写入对象
    writer = csv.writer(f_out)
    # 写入表头
    writer.writerow(['file', 'algorithm', 'time', 'size', 'ratio'])
    # 写入所有的结果
    writer.writerows(results)

全部代码写完不超过10分钟,直接开run

0x05 统计分析

测试共使用两个样例,simple1.json(914MB)和simple2.json(1.49GB)

测试机器windows 11/i7-11700 4.6GHZ/python 3.11/Brotli 1.1.0/python-zstandard 0.22.0

数据跑完,发现一个诡异的东西,gzip在level 0下跑出来的东西比原本的内容还大

image_z7WD6uPuMw.png

统计的时候排掉gzip0先画张图看看

image_f4veZ_xqun.png

从图里可以比较直观的看出来,随着压缩等级的增加,压缩率越来越逼近极限水平,压缩耗时也成倍的增加

同时可以发现,gzip即便是用最高的压缩级别,压缩率也不如其他三种算法的任何一档,因此可以直接把gzip方案排除掉

同时把耗时超长至不可接受的几个算法等级排除掉再画张图

image_ga45lnmGWb.png

这样再看会比较直观,可以看出来对于目标json文件的压缩,极限的压缩率在0.04附近

在接近极限压缩率的时候,brotli_9表现出了较高的性价比

在这张图上,time的纵坐标依然是较高的水平,我们再一步过滤掉高耗时的一些方案

image_iZkTv0MtAy.png

过滤掉耗时100秒以上的方案,结果少了很多,好像进了决赛圈一样

几个怪异的尖尖好像都是lzma带来的,先排掉lzma,再排掉两个高level的zstandard_13和zstandard_14

image_1_DTPK84HL.png

剩下这张图的结果更就比较明显了

zstandard_0/zstandard_3/zstandard_4在极低耗时下实现了0.06的压缩率

zstandard_5/zstandard_6/brotli_4 在较低耗时下实现了0.05附近的压缩率

追求更高压缩比的情况下brotli_6/brotli_7提供了较好的性价比

因为选择的测试样本是比较极端的1G+的json,实际使用中存在另外两个要点,

一是其他json没有这么大,可能普遍在几分之一的大小,二是单本PDF的解析时间也远超压缩所用的耗时,压缩耗时是个比较敏感的参数,但是也没有那么敏感。

综合考量,最后选择了brotli_7brotli_6方案作为json数据的压缩方案。

love loli,love live!