[bitsail][connector]Doris batch replace model use recordStream buffer#305
[bitsail][connector]Doris batch replace model use recordStream buffer#305DongLiang-0 wants to merge 3 commits intobytedance:masterfrom
Conversation
| key(WRITER_PREFIX + "sink_flush_interval_ms") | ||
| .defaultValue(5000); | ||
|
|
||
| ConfigOption<Integer> SINK_CHECK_INTERVAL = |
There was a problem hiding this comment.
SINK_CHECK_INTERVAL represents the time interval, not too large
| } | ||
|
|
||
| private synchronized void startLoad(List<byte[]> flushCache) throws IOException { | ||
| this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(), true); |
There was a problem hiding this comment.
The Label maybe duplicate when multi task start load at same time, so label prefix should contains subtaskid info?
There was a problem hiding this comment.
The Label is generated according to timestamp. Due to the synchronized keyword in this method, only a single thread can be entered at a time
| private AtomicInteger cacheRecordSize; | ||
| private AtomicInteger cacheRecordCount; | ||
| private volatile boolean loading = false; | ||
| private final ArrayList<byte[]> cache = new ArrayList<>(); |
There was a problem hiding this comment.
NIT: replace to linkedqueue? I think queue more match list in here.
There was a problem hiding this comment.
This is a good suggestion, I will improve here
| this.cacheRecordSize = new AtomicInteger(); | ||
| this.cacheRecordCount = new AtomicInteger(); | ||
| this.scheduler = Executors.newScheduledThreadPool(1, | ||
| new BasicThreadFactory.Builder().namingPattern("Doris-replace-writer").daemon(true).build()); |
There was a problem hiding this comment.
Why this thread is daemon?
There was a problem hiding this comment.
I think that when the user thread exits, the JVM not necessary to manage the checkDone program after setting it as a daemon thread
| this.dorisBatchBuffers = new ArrayList(dorisExecutionOptions.getBufferCount()); | ||
| this.dorisOptions = dorisOptions; | ||
| this.labelGenerator = new LabelGenerator(dorisExecutionOptions.getLabelPrefix(), dorisExecutionOptions.isEnable2PC()); | ||
| this.recordStream = new RecordStream(dorisExecutionOptions.getBufferSize(), dorisExecutionOptions.getBufferCount()); |
There was a problem hiding this comment.
need not be a field for DorisReplaceProxy?
There was a problem hiding this comment.
I will improve here
| buf.put(flushCache.get(i)); | ||
| } | ||
| byte[] array = buf.array(); | ||
| dorisStreamLoad.writeRecord(buf.array()); |
There was a problem hiding this comment.
Why there need invoke twice writeRecord?
There was a problem hiding this comment.
This should be what I forgot to delete when I deleted the redundant code at the end, I will improve here
| LOG.info("not loading, skip timer checker"); | ||
| return; | ||
| } | ||
| if (dorisStreamLoad.getPendingLoadFuture() != null |
There was a problem hiding this comment.
I have question about this line,does the pending load future change between the interval? i think the right pipeline should be
- Future pendingLoadFuture = dorisStreamLoad.getPendingLoadFuture();
- check pendingLoadFuture same with dorisStreamLoad.getPendingLoadFuture()
- check pendingLoadFuture is done?
There was a problem hiding this comment.
I don't think this is a problem. As long as dorisStreamLoad.getPendingLoadFuture() is not done, it needs to be stopped.
Signed-off-by:
Pre-Checklist
Note: Please complete ALL items in the following checklist.
Purpose
Some description about what this PR wants to do.
Approaches
Some description about how this PR achives the purpose.
Related Issues
e.g. Close #796
New Behavior (screenshots if needed)
N/A