feat(parquet): opt-in streaming reads for large data pages#880
feat(parquet): opt-in streaming reads for large data pages#880joechenrh wants to merge 42 commits into
Conversation
Add a small streaming util package (parquet/internal/encoding/streaming) with ValueBuffer, an incremental byte source (streamBuffer) that reads from an io.Reader, slides as values are consumed, grows to fit oversized values, and owns the page's stream (Close runs a caller-supplied cleanup). Plus a Decoder interface (SetSource). The PLAIN BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY decoders gain a streaming path, implemented as methods in their own files: the materialized []byte logic is kept byte-for-byte and only gains a `src streaming.ValueBuffer` field plus a one-line guard in Decode/Discard dispatching to decodeStreaming. SetData resets src, so a cached decoder reused for a materialized page reverts. Materialized parity: all existing encoding tests pass; streaming verified for byte_array and FLBA with a one-byte-at-a-time reader and oversized values. Refs apache#865 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add ReaderProperties.EnablePageStreaming (default off). Eligible data pages — PLAIN-encoded V1/V2, byte_array/fixed_len_byte_array, unencrypted, with a streaming-capable codec (UNCOMPRESSED/GZIP/BROTLI/ZSTD) — skip full-page decompression in serializedPageReader.Next(): the rep/def levels are materialized (V2 by header byte-lengths; V1 peeled off the decompressed stream by readLevelData using the same length rules as LevelDecoder.SetData) and the values are exposed as a streaming.ValueBuffer over the compressed region. The page holds just that source; on release drainAndClose consumes any unread compressed bytes and closes the decompressor so the reader lands on the next page header. Ineligible pages fall back to the materialized path. initDataDecoder resolves the decoder and delegates value feeding to setDecoderData, which routes streaming pages to SetSource and materialized pages to SetData on the same cached PLAIN decoder. End-to-end round-trip tests assert streaming and materialized reads agree across V1/V2 and codecs, for required and nullable columns. Refs apache#865 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…elData Fold the chunk-constant streaming inputs (streamEnabled + physicalType + compressType) into a single canStream bool, evaluated once when the page reader is built via streamablePhysicalType/streamableCodec; streamingEligible is now one expression. Remove the unused LevelData accessor and its DataPage interface method (the level decoders read Data(), which for a streaming page is already the level region), and trim a few comments. Refs apache#865 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Rename canStream -> columnCanStream (the column/chunk-constant part: property + physical type + codec, evaluated once) and streamingEligible -> pageCanStream (the per-page check that adds the page's PLAIN encoding), so the two names spell out which dimension each judges. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e per path The streaming and materialized branches of Next() built the DataPageV1/V2 literal twice with identical fields except buf and valueSource. Construct the page once with the common fields, then set buf (materialized) or buf + valueSource (streaming) in the respective branch. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The V1 and V2 streaming branches set buf and valueSource identically (buf wraps the level region, valueSource is a stream buffer drained + closed on release), differing only in how the level bytes, value reader, limit, and closer are obtained. Extract that shared wiring into page.setStreamingValues. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Mirror V1: build one io.LimitReader over the whole compressed page, read V2's raw (uncompressed) levels off it, then take the values from what remains, instead of reading levels directly from p.r plus a second limit reader sized to the value region. Same bytes consumed; V1 and V2 now start identically. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Both branches build the value reader/closer the same way (decompressor over the page limit reader when compressed, else the limit itself); extract that into page reader's valueStream. V1 passes true (its whole body is under the codec, a no-op passthrough when the column is uncompressed); V2 passes the header's IsCompressed. Also drop the now-obvious doc comments on the streaming helpers. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Fix the stale "see streaming_decoder.go" reference in the decoders' src field docs (those methods live in the decoder files now) and drop the redundant impl-method doc for page.ValueSource (the DataPage interface already documents it). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adding ValueSource to the exported DataPage interface would break any external type that implements it. Access it instead through an unexported streamingPage interface that the built-in *DataPageV1/*DataPageV2 satisfy; DataPage is unchanged from upstream. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback on the streaming read path: - Gate streaming to the internal Column() path so the public GetColumnPageReader() always returns materialized pages, preserving Page.Data()'s "whole page" contract. Add a regression test. - Close() now releases the current page, draining and closing a streaming page's value stream; the V1 level-read error path drains the page limit via drainAndClose. - Return an error instead of panicking when a streaming page's decoder is not streaming-capable (checked type assertion). - Enable streaming only when the actual codec implements compress.StreamingCodec (a codec registered via RegisterCodec need not), and disable it on Reset, which lacks the column schema. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Back out gating page streaming to the internal Column() path. Making the public GetColumnPageReader keep a materialized-only Data() contract while Column() streams would require either that split or a lazy-materializing Data(); the level decoders read the level region through Page.Data(), so an honest full-page Data() is not a small change. Leave the Data() semantics of streaming pages for review to decide. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add page.LevelBuffer() and have initLevelDecodersV1/V2 read the rep/def level region through it instead of Page.Data(). No internal path now calls Data() on a streaming page, so the public semantics of Data() for streaming pages can be decided independently. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Move the level-region accessor off the shared base page onto unexported levelBuffer() methods on *DataPageV1/*DataPageV2, so dictionary pages no longer expose it and it stays out of the public API. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
columnChunkReader.Close already releases its current page (the same object the page reader holds), so having serializedPageReader.Close release curPage too freed the page's buffer twice and could panic. The column reader's release already drains a streaming page's value stream, so drop the redundant release from the page reader. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Extract a dataPageBase embedded struct (embedded by DataPageV1/V2 but not DictionaryPage) holding the value source and the levelBuffer/valueBuffer accessors, so dictionary pages no longer carry streaming members. Rename the accessor to valueBuffer() (field valueBuf) to match its type and pair with levelBuffer(). initDataDecoder/setDecoderData now take the concrete *dataPageBase, so the streamingPage interface and its type assertion are gone. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…face Add an unexported valueBuffer() to DataPage (safe: DataPage is already unimplementable outside the package since Page.Encoding returns an internal type, and the only implementers are *DataPageV1/V2). This lets initDataDecoder/setDecoderData take DataPage instead of the concrete *dataPageBase, so processPage keeps its single shared call after the type switch. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Fill(consumed, need) welded together "commit consumed bytes" and "ensure need bytes"; the flush call Fill(pos, 0) was just an obscure advance. Split into Advance(n) (mark consumed) and Fill(need) (ensure/return the window). The streaming decoders now Advance before refilling and end with a plain Advance(pos). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Fill read only until need was met, so a reader returning small chunks would refill (and slide) nearly per value. Read until the buffer is full (or EOF) instead, making the batch size a property of the buffer rather than of the reader. Safe because the value stream is a bounded page region. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The streaming decoders copied every value out because the buffer slides and gets overwritten on refill. Add FillOwned: it returns bytes that stay valid after later calls, so decodeStreaming aliases values (3-index cap, non-nil empties) exactly like the materialized path — dropping a per-value alloc+copy. A shared bit tracks whether the buffer has been handed out for aliasing; while set, a refill moves to a fresh buffer (the old one stays alive through its aliases) instead of sliding in place. Discard keeps using the reuse-in-place Fill. White-box tests cover the alias invariant. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
discardStreaming was the only caller; start it from a nil window like decodeStreaming so the first Fill fetches the bytes. Removes Bytes from the interface and trims the value_buffer comments. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
discardStreaming was the only caller; start it from a nil window like decodeStreaming so the first Fill fetches the bytes. Remove Bytes from the interface and trim verbose comments in value_buffer and the decoders. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
zeroshade
left a comment
There was a problem hiding this comment.
Thanks for tackling #865 — the incremental-decode design is clean and the commit series is easy to follow.
I reviewed the read path in depth and ran the new streaming tests plus the existing encoding and parquet/file suites with -race; all pass and materialized parity holds. The drain-before-advance ordering in Next() (release the previous page, then read the next header), the codec allowlist (correctly excluding raw-block Snappy even though it implements StreamingCodec), the V1 level byte-length parity with LevelDecoder.SetData, and the DecodeSpaced -> Decode streaming dispatch all check out.
A few things I'd like addressed before merge:
Page.Data()contract on streaming pages (blocking). A streaming page'sbufholds only the rep/def level region, so the publicPage.Data()returns truncated bytes when a page is obtained via the publicGetColumnPageReaderwithEnablePageStreamingset. Since internal callers now go throughlevelBuffer()/valueBuffer(), I'd like us to either gate streaming to the internalColumn()path or documentData()explicitly (see inline).- Unbounded allocation from an untrusted RLE length prefix in
readLevelData. - Test coverage for BIT_PACKED levels, the skip/
Discardpath, repeated columns, and ineligible-codec fallback.
Details inline. Nice work overall — with the Data() semantics resolved this is close.
| mem: r.props.Allocator(), | ||
| // Streaming (EnablePageStreaming) inputs; only the unencrypted path is | ||
| // ever streaming-eligible, so columnCanStream is left false elsewhere. | ||
| columnCanStream: r.props.EnablePageStreaming && |
There was a problem hiding this comment.
Enabling streaming here also affects the public GetColumnPageReader: a streaming DataPageV1/V2's buf holds only the rep/def level region, so Page.Data() returns just the levels instead of the whole page. That's a silent semantic change to a public method for anyone reading Data() off a data page with EnablePageStreaming set.
Now that the internal callers read levels/values through levelBuffer()/valueBuffer(), the original blocker to keeping GetColumnPageReader materialized-only (the revert in 2f5aab9) looks gone. Could we gate streaming to the internal Column() path, or — failing that — document Data() as level-only/undefined for streaming pages? This is the main thing I'd like resolved before merge.
There was a problem hiding this comment.
Add document for both EnablePageStreaming and Data() to clarify the change. My judgment is as follows. I wonder if you have any opinions:
I find that pqarrow also uses GetColumnPageReader, so I prefer explicitly document this semantic change. Besides, since this new option is off by default, existing downstream user codes shouldn't be affected when bumping the package. When a user (or an AI agent) wants to enable the option, they can find this changes from the comments from EnablePageStreaming.
| n := int(binary.LittleEndian.Uint32(hdr[:])) | ||
| start := len(buf) | ||
| buf = append(buf, hdr[:]...) | ||
| buf = append(buf, make([]byte, n)...) |
There was a problem hiding this comment.
n comes straight from the 4-byte RLE length prefix in the page and is used to make([]byte, n) before any bounds check, so a corrupt/malicious V1 page with a bogus prefix (e.g. ~4 GiB) triggers a huge allocation before the following io.ReadFull fails.
The materialized LevelDecoder.SetData guards this with nbytes > len(data)-4 against the already-bounded page buffer; the streaming path has no equivalent ceiling (V1 carries no per-level byte length in the header). Please bound n (e.g. against lenCompressed/lenUncompressed) before allocating.
There was a problem hiding this comment.
Yeah, updated to bound the allocation with uncompressed size in 3cd0076
| } | ||
| case format.Encoding_BIT_PACKED: | ||
| bitWidth := bits.Len64(uint64(maxLvl)) | ||
| n := int(bitutil.BytesForBits(int64(numValues) * int64(bitWidth))) |
There was a problem hiding this comment.
Minor/consistency: LevelDecoder.SetData computes this length with the overflow-checked shared_utils.Mul(nbuffered, bitWidth), whereas here it's a raw int64(numValues) * int64(bitWidth). Not exploitable for real level bit-widths, but matching the sibling code's checked multiply would keep the two paths in sync.
There was a problem hiding this comment.
Nice catch, I didn't notice that my AI has modified this part during reviewing. Updated in aeeb992
|
|
||
| // NewStreamBuffer returns a ValueBuffer over r. | ||
| func NewStreamBuffer(r io.Reader, onClose func() error) ValueBuffer { | ||
| return &streamBuffer{r: r, onClose: onClose, buf: make([]byte, defaultStreamBufferSize)} |
There was a problem hiding this comment.
Two notes on the backing buffer:
- It's a fresh 1 MiB (
defaultStreamBufferSize) allocation per streaming page via the Go allocator rather than the configuredmemory.Allocator, so it's untracked byCheckedAllocatorand adds GC churn on files with many eligible pages. Consider pooling it (and/or sizing relative to the read batch). - Worth calling out in the
EnablePageStreamingdoc that peak memory is reallybatch + ~1 MiB + largest-value growth + levels, not just the batch size.
There was a problem hiding this comment.
In 1b82270, I cap the size of the buffer with the uncompressed size and manage all the buffers with Allocator. And the comment for EnablePageStreaming is also aligned with the new behavior.
There was a problem hiding this comment.
Oh, maybe we can't use allocator here.
There was a problem hiding this comment.
Updated, we should store the buffer inside the valueBuffer, and resue them for each Decode call.
| // Varied sizes: several small, some > 4096 (stream buffer), some > 1024 (page). | ||
| values := makeStreamTestValues([]int{65, 100, 5000, 200, 9000, 50, 4096, 1}) | ||
|
|
||
| cases := []struct { |
There was a problem hiding this comment.
Since this ships new decode/peel logic, a few gaps I'd like covered:
- BIT_PACKED levels — only RLE levels are exercised, but
readLevelDatahas a BIT_PACKED branch. - Skip path —
discardStreaming(viaSeekToRow/skipRows) isn't covered; the tests only do fullReadBatch. - Repeated columns (
maxRepLevel > 0) — the V1 rep-level peel inreadLevelDatais untested (current cases are flat required/optional). - Ineligible-codec fallback — a case with Snappy or LZ4_RAW +
EnablePageStreaming=trueasserting correct (materialized) reads would lock in the allowlist behavior.
zeroshade
left a comment
There was a problem hiding this comment.
Thanks for tackling #865 — the incremental-decode design is clean and the commit series is easy to follow.
I reviewed the read path in depth and ran the new streaming tests plus the existing encoding and parquet/file suites with -race; all pass and materialized parity holds. The drain-before-advance ordering in Next() (release the previous page, then read the next header), the codec allowlist (correctly excluding raw-block Snappy even though it implements StreamingCodec), the V1 level byte-length parity with LevelDecoder.SetData, and the DecodeSpaced -> Decode streaming dispatch all check out.
A few things I'd like addressed before merge:
Page.Data()contract on streaming pages (blocking). A streaming page'sbufholds only the rep/def level region, so the publicPage.Data()returns truncated bytes when a page is obtained via the publicGetColumnPageReaderwithEnablePageStreamingset. Since internal callers now go throughlevelBuffer()/valueBuffer(), I'd like us to either gate streaming to the internalColumn()path or documentData()explicitly (see inline).- Unbounded allocation from an untrusted RLE length prefix in
readLevelData. - Test coverage for BIT_PACKED levels, the skip/
Discardpath, repeated columns, and ineligible-codec fallback.
Details inline. Nice work overall — with the Data() semantics resolved this is close.
Accidental duplicate submission — dismissing this copy; the identical active review remains.
readLevelData used the RLE length prefix to make([]byte, n) with no bounds check, so a corrupt V1 page with a bogus prefix could trigger a huge allocation before the following ReadFull failed. Bound the level region by the uncompressed page size (as LevelDecoder.SetData does against the page buffer), erroring on an out-of-range length. White-box tests cover the reject and accept paths. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Match LevelDecoder.SetData: size the BIT_PACKED level region with the overflow-checked utils.Mul instead of a raw int64 multiply, erroring on overflow before allocating. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Merge the reject/accept cases into one test that varies only maxBytes over a shared level region, and drop the redundant comments. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Allocate the streaming ValueBuffer's backing buffers from the configured memory.Allocator instead of make, tracking each allocation and freeing it on Close, so they are visible to CheckedAllocator and can be pooled. Cap the initial buffer at min(1 MiB, uncompressed page size) so small pages don't each grab a full buffer. Aliased values are now valid until the page is released, matching the existing ReadBatchInPage (alias) / ReadBatch (clone) contract. Reorder NewStreamBuffer params (mem, r, maxSize, onClose), document the real peak memory on EnablePageStreaming, and reword "materialized" to "non-streaming"/"read whole". CheckedAllocator alloc/free-balance tests added (unit and end-to-end). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
With EnablePageStreaming, a streaming data page's buf holds only the rep/def level region, so Page.Data returns just the levels. Document that on the interface rather than gating streaming off the public GetColumnPageReader, which would also disable streaming for the pqarrow (arrow) read path. Internal decoders read via levelBuffer()/valueBuffer() and never call Data() on a streaming page; the property is off by default, so existing readers are unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Cross-reference the level-only Page.Data behavior at the property where streaming is enabled, so callers see it at the decision point. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
I have tried my best to control the size (as well as the scope) of this PR, but it still exceeds |
…le codec Add the streaming coverage requested in review: - readLevelData BIT_PACKED branch (white-box; the writer only emits RLE) - ineligible-codec fallback: Snappy/LZ4_RAW with EnablePageStreaming must match the materialized read - skip path: discardStreaming via Skip - repeated column (maxRepLevel > 0): the V1 rep-level peel Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Point the caveat at the low-level GetColumnPageReader path and note that reading via RowGroup.Column()/ReadBatch is unaffected, so callers can tell at a glance whether enabling streaming touches their code. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Rename NewStreamBuffer's maxSize -> sizeHint (it's the initial size, not a hard cap; the buffer still grows for an oversized value). - Rename setStreamingValues/drainAndClose limit -> rawSrc. - Move streamablePhysicalType/streamableCodec to row_group_reader.go (their only caller). - Reword the Fill lifetime contract, the V2 level-read comment, a decoder-test comment, and trim the EnablePageStreaming doc. - Rename a test's shadowing `const cap` to batchSize. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Close() releases curPage so a direct PageReader user's streaming value stream (buffers + decompressor) is freed; Release() is made idempotent so the column-reader path (which also releases) doesn't double-free. - SeekToPageWithRow releases curPage before repositioning p.r, so a streaming page's drain-on-release reads from the old position, not the seeked-to offset (corrupted reads on >1 MiB partially-read pages). Tests: direct-Close allocator balance, cross-page skip, SeekToRow parity, and a large-page partial-read seek that reproduces the drain bug. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
discardStreaming used Fill(n*typeLen), which makes streamBuffer allocate the whole skipped region contiguously — a page-level burst on wide columns. Discard in buffer-bounded chunks instead (the bytes need not be contiguous), so peak stays ~the stream buffer regardless of n or typeLen. Add TestPageStreamingFLBASkip: FLBA round-trip + cross-page skip parity. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The streaming NewReader allocated a fresh zstd.Decoder per data page, while the block-decode path reuses a singleton. Pool the streaming decoders and Reset them per page (Close returns to the pool via Reset(nil) instead of freeing), so EnablePageStreaming doesn't pay decoder init on columns with many small pages. Concurrency 1 keeps pooled decoders goroutine-free. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… test Remove the self-evident pool/Close/Reset comments, and drop the dedicated pool-reuse test (globalEncoderPool has none either); the zstd streaming round-trip/seek/skip tests already exercise the pooled decoder. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Collapse the four byte-array writer helpers into one parameterized writeByteArrayStream (repetition/levels/page size), with a thin writeStreamTestColumn wrapper for the common case. - Extract the shared ReadBatch drain loop into collectByteArray. - Drop test doc comments that only restate a self-explaining test name. No coverage change; ~120 fewer test lines. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- zstdcloser.Close early-returns when the decoder is already returned to the pool, so a double Close can't Put the same decoder twice. - Add TestPageStreamingEngaged: a required column's streaming page Data() holds only the (empty) level region, so the test fails if eligibility silently falls back to the whole-page read. - Add streaming decode error-path tests (truncated value, negative length, truncated FLBA). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The streaming value buffer handed values out via FillOwned and kept every backing buffer alive until the page closed, so peak memory was the whole uncompressed page — no better than reading it at once. Redesign the value buffer as a reusable chunk allocator: bytes from Fill stay valid until Recycle, and Decode aliases values in place (no copy), calling Recycle at entry to reclaim the previous batch. Because Decode again fills a full batch, level/value counts stay aligned and no pending coordination is needed. readBatchInPage clips the batch on a streaming page toward the average value width so one Decode's aliased values stay near the buffer size (best-effort; skewed sizes may overshoot). Decoded values are unchanged. - value buffer: Fill/Advance/Skip/Recycle chunk allocator, corrupt-length bound - add streaming.ClipBatch; only clips when the value region exceeds one buffer - tests: record-reader (pqarrow path) streaming read, buffer + clip edge cases Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- V2 streaming rejects a level region larger than the page body before allocating, so a corrupt header can't drive a huge allocation (V1 already bounded this) - releaseValueStream resets the page clip, so a pooled page reused as a non-streaming page is not clipped and under-read - Reset releases a live streaming page's value stream instead of dropping it Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
zeroshade
left a comment
There was a problem hiding this comment.
Re-review of the latest push — thanks for the very thorough follow-up. All three of my blocking items are addressed, and I verified locally (built ./parquet/..., ran the TestPageStreaming* suite with PARQUET_TEST_DATA set — green):
Page.Data()contract — documented on bothEnablePageStreamingandData(), with streaming kept to the eligible internal path viacolumnCanStream. Your rationale (off by default, pqarrow uses the same accessor, documented semantic) is sound.- Unbounded RLE level allocation — now bounded by the uncompressed page size in the streaming level read (
setStreamingValues), and the BIT_PACKED length uses the checked multiply. Both inline points resolved. - Coverage —
TestPageStreaming{ByteArraySkip,FLBASkip,Repeated,ReadBatchInPageRepeated,IneligibleCodecFallback,AllocatorBalance,PeakMemoryBounded,PageReaderCloseFreesStream}cover the skip / repeated / ineligible-codec / allocator-tracking paths I asked for.
Scope note: this pass verified the above + ran the streaming suite; I haven't deep-audited the newer perf commits (zstd decoder pooling, chunked FLBA skip, V2 streaming hardening). Since it's large and you mentioned wanting more testing, I'll hold a formal approve until you say it's ready — ping me for a final pass. Great iteration.
Rationale for this change
Addresses #865. Today the reader materializes each full uncompressed data page before decoding, so peak memory scales with the page's uncompressed size — a problem for files with very large data pages (e.g. a 1 GB compressed / 1.4 GB uncompressed page). This adds an opt-in path that keeps peak memory near the requested batch size for the common large-page shape.
Approach was discussed and agreed on the issue.
What changes are included in this PR
Opt-in via
ReaderProperties.EnablePageStreaming(default false, no behavior change). When set, eligible data pages skip full-page decompression and decode their values incrementally; every ineligible page falls back to the existing materialized path.Scope (first cut):
byte_arrayandfixed_len_byte_arrayUNCOMPRESSED/GZIP/BROTLI/ZSTD(explicit allowlist — Snappy is a raw block whereas itsNewReaderis the framed format, and LZ4_RAW has no streaming reader)Design:
parquet/internal/encoding/streaming: a smallValueBuffer— an incremental byte source that reads from anio.Reader, slides as values are consumed, grows to fit a single oversized value, and owns the page's stream (drained + closed on release).PlainByteArrayDecoder/PlainFixedLenByteArrayDecoderkeep their[]bytelogic byte-for-byte, gaining only asrcfield + a one-line guard inDecode/Discard. Streaming decode lives beside them;SetDataresetssrc, so a cached decoder reused for a materialized page reverts. Other decoders are untouched.LevelDecoder.SetData; values become aValueBufferover the compressed region. On page release the underlying stream is drained + closed so the reader lands on the next page header even if values were skipped.SetSource(streaming) orSetData(materialized). The publicPage/DataPageinterfaces are unchanged (the streaming accessor is reached via an unexported interface, so external implementations don't break).Are these changes tested?
Yes.
parquet/file/streaming_page_test.go): streaming vs materialized reads are asserted identical across V1/V2 and each codec, for required and nullable columns (the nullable case covers the V1 def-level peel + streamingDecodeSpaced), including values larger than a page and the stream buffer.Are there any user-facing changes?
One new opt-in field,
ReaderProperties.EnablePageStreaming(default false). No change to existing behavior or public interfaces.Follow-ups (out of scope here): fixed-width numeric PLAIN types, raw-Snappy and LZ4_RAW streaming, and a memory benchmark.