-
Notifications
You must be signed in to change notification settings - Fork 95
Expand file tree
/
Copy pathaligned_chunk_reader.h
More file actions
189 lines (174 loc) · 7.84 KB
/
aligned_chunk_reader.h
File metadata and controls
189 lines (174 loc) · 7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef READER_CHUNK_ALIGNED_READER_H
#define READER_CHUNK_ALIGNED_READER_H
#include "common/allocator/my_string.h"
#include "common/tsfile_common.h"
#include "compress/compressor.h"
#include "encoding/decoder.h"
#include "file/read_file.h"
#include "reader/filter/filter.h"
#include "reader/ichunk_reader.h"
namespace storage {
class AlignedChunkReader : public IChunkReader {
public:
AlignedChunkReader()
: read_file_(nullptr),
time_chunk_meta_(nullptr),
value_chunk_meta_(nullptr),
measurement_name_(),
time_chunk_header_(),
value_chunk_header_(),
cur_time_page_header_(),
cur_value_page_header_(),
time_in_stream_(),
value_in_stream_(),
file_data_time_buf_size_(0),
file_data_value_buf_size_(0),
time_chunk_visit_offset_(0),
value_chunk_visit_offset_(0),
time_compressor_(nullptr),
value_compressor_(nullptr),
time_filter_(nullptr),
time_decoder_(nullptr),
value_decoder_(nullptr),
time_in_(),
value_in_(),
time_uncompressed_buf_(nullptr),
value_uncompressed_buf_(nullptr),
cur_value_index(-1) {}
int init(ReadFile* read_file, common::String m_name,
common::TSDataType data_type, Filter* time_filter) override;
void reset() override;
void destroy() override;
~AlignedChunkReader() override = default;
bool has_more_data() const override {
return prev_value_page_not_finish() ||
(value_chunk_visit_offset_ -
value_chunk_header_.serialized_size_ <
value_chunk_header_.data_size_) ||
prev_time_page_not_finish() ||
(time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ <
time_chunk_header_.data_size_);
}
ChunkHeader& get_chunk_header() override { return value_chunk_header_; }
int load_by_aligned_meta(ChunkMeta* time_meta,
ChunkMeta* value_meta) override;
int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
common::PageArena& pa) override;
int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
common::PageArena& pa, int64_t min_time_hint,
int& row_offset, int& row_limit) override;
private:
bool should_skip_page_by_time(int64_t min_time_hint);
bool should_skip_page_by_offset(int& row_offset);
FORCE_INLINE bool chunk_has_only_one_page(
const ChunkHeader& chunk_header) const {
return (chunk_header.chunk_type_ & ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) ==
ONLY_ONE_PAGE_CHUNK_HEADER_MARKER;
}
int alloc_compressor_and_decoder(storage::Decoder*& decoder,
storage::Compressor*& compressor,
common::TSEncoding encoding,
common::TSDataType data_type,
common::CompressionType compression_type);
int get_cur_page_header(ChunkMeta*& chunk_meta,
common::ByteStream& in_stream_,
PageHeader& cur_page_header_,
uint32_t& chunk_visit_offset,
ChunkHeader& chunk_header);
int read_from_file_and_rewrap(common::ByteStream& in_stream_,
ChunkMeta*& chunk_meta,
uint32_t& chunk_visit_offset,
int32_t& file_data_buf_size,
int want_size = 0, bool may_shrink = true);
bool cur_page_statisify_filter(Filter* filter);
int skip_cur_page();
int decode_cur_time_page_data();
int decode_cur_value_page_data();
int decode_time_value_buf_into_tsblock(common::TsBlock*& ret_tsblock,
Filter* filter,
common::PageArena* pa);
bool prev_time_page_not_finish() const {
return (time_decoder_ && time_decoder_->has_remaining(time_in_)) ||
time_in_.has_remaining();
}
bool prev_value_page_not_finish() const {
return (value_decoder_ && value_decoder_->has_remaining(value_in_)) ||
value_in_.has_remaining();
}
int decode_tv_buf_into_tsblock_by_datatype(common::ByteStream& time_in,
common::ByteStream& value_in,
common::TsBlock* ret_tsblock,
Filter* filter,
common::PageArena* pa);
int i32_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream& time_in,
common::ByteStream& value_in,
common::RowAppender& row_appender,
Filter* filter);
int STRING_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream& time_in,
common::ByteStream& value_in,
common::RowAppender& row_appender,
common::PageArena& pa,
Filter* filter);
private:
ReadFile* read_file_;
ChunkMeta* time_chunk_meta_;
ChunkMeta* value_chunk_meta_;
common::String measurement_name_;
ChunkHeader time_chunk_header_;
// TODO: support reading more than one measurement in AlignedChunkReader.
ChunkHeader value_chunk_header_;
PageHeader cur_time_page_header_;
PageHeader cur_value_page_header_;
/*
* Data reader from file is stored in @in_stream_, and the size
* is stored in @file_data_buf_size_. Note, in_stream_.total_size_
* is used to limit deserialization, that is why we still have
* @file_data_buf_size_.
*
* Since we may want keep data of current page (and page header
* of next page) in memory, we need a byte-size cursor to tell
* us which byte we are processing, so we have @chunk_visit_offset_
* it refer to position from the start of chunk_header_,
* also refer to offset within the chunk (including chunk header).
* It advanced by step of a page header or a page tv data.
*/
common::ByteStream time_in_stream_;
common::ByteStream value_in_stream_;
int32_t file_data_time_buf_size_;
int32_t file_data_value_buf_size_;
uint32_t time_chunk_visit_offset_;
uint32_t value_chunk_visit_offset_;
// Statistic *page_statistic_;
Compressor* time_compressor_;
Compressor* value_compressor_;
Filter* time_filter_;
Decoder* time_decoder_;
Decoder* value_decoder_;
common::ByteStream time_in_;
common::ByteStream value_in_;
char* time_uncompressed_buf_;
char* value_uncompressed_buf_;
std::vector<uint8_t> value_page_col_notnull_bitmap_;
uint32_t value_page_data_num_;
int32_t cur_value_index;
};
} // end namespace storage
#endif // READER_CHUNK_READER_H