Projects
Eulaceura:Factory
kafka
_service:obs_scm:0015-SessionWindows-closed-ear...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:0015-SessionWindows-closed-early.patch of Package kafka
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 5648e8f0a3..24e9e21ad7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -121,7 +121,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce final long timestamp = context().timestamp(); observedStreamTime = Math.max(observedStreamTime, timestamp); - final long closeTime = observedStreamTime - windows.gracePeriodMs(); + final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap(); final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>(); final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index ab2adbfbb1..244ea9f4fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -441,9 +441,9 @@ public class KStreamSessionWindowAggregateProcessorTest { context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); processor.process("OnTime1", "1"); - // dummy record to advance stream time = 1 - context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); - processor.process("dummy", "dummy"); + // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window + context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders())); + processor.process("dummy", "dummy"); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) { @@ -455,7 +455,7 @@ public class KStreamSessionWindowAggregateProcessorTest { assertThat( appender.getMessages(), hasItem("Skipping record for expired window." + - " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]") + " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]") ); } @@ -542,17 +542,17 @@ public class KStreamSessionWindowAggregateProcessorTest { context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); processor.process("OnTime1", "1"); - // dummy record to advance stream time = 1 - context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); - processor.process("dummy", "dummy"); + // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window + context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders())); + processor.process("dummy", "dummy"); // delayed record arrives on time, should not be skipped context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); processor.process("OnTime2", "1"); - // dummy record to advance stream time = 2 - context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null)); - processor.process("dummy", "dummy"); + // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window + context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders())); + processor.process("dummy", "dummy"); // delayed record arrives late context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); @@ -561,7 +561,7 @@ public class KStreamSessionWindowAggregateProcessorTest { assertThat( appender.getMessages(), hasItem("Skipping record for expired window." + - " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]") + " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]") ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index 46a8ab8dcf..e0b7957e01 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -581,7 +581,7 @@ public class SuppressScenarioTest { // arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace inputTopic.pipeInput("k1", "v1", 1L); // any record in the same partition advances stream time (note the key is different) - inputTopic.pipeInput("k2", "v1", 6L); + inputTopic.pipeInput("k2", "v1", 11L); // late event for first window - this should get dropped from all streams, since the first window is now closed. inputTopic.pipeInput("k1", "v1", 5L); // just pushing stream time forward to flush the other events through. @@ -594,7 +594,7 @@ public class SuppressScenarioTest { new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L), new KeyValueTimestamp<>("[k1@0/5]", null, 5L), new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L), - new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L), + new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L), new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L) ) ); @@ -602,7 +602,7 @@ public class SuppressScenarioTest { drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), asList( new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L), - new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L) + new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L) ) ); }
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2