Commit 9125a14c authored by Joshua Gutow's avatar Joshua Gutow

WIP: Better re-assemble

parent 770c1859
...@@ -15,9 +15,10 @@ import ( ...@@ -15,9 +15,10 @@ import (
type ChannelWithMeta struct { type ChannelWithMeta struct {
ID derive.ChannelID `json:"id"` ID derive.ChannelID `json:"id"`
SkippedFrames []FrameWithMetadata `json:"skipped_frames"`
IsReady bool `json:"is_ready"` IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
Frames []FrameWithMetadata `json:"frames"` Frames []FrameWithMetadata `json:"frames"`
SkippedFrames []FrameWithMetadata `json:"skipped_frames"`
} }
type FrameWithMetadata struct { type FrameWithMetadata struct {
...@@ -71,20 +72,78 @@ func Channels(config Config) { ...@@ -71,20 +72,78 @@ func Channels(config Config) {
} }
func processFrames(id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMeta { func processFrames(id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMeta {
// TO DO: Use the same approach as in derivation. // This code is roughly copied from rollup/derive/channel.go
ready := false // We will use that file to reconstruct the batches, but need to implement this manually
// to figure out which frames got pruned.
var skippedFrames []FrameWithMetadata
framesByNumber := make(map[uint16]FrameWithMetadata)
closed := false
var endFrameNumber, highestFrameNumber uint16
for _, frame := range frames { for _, frame := range frames {
ready = frame.Frame.IsLast || ready if frame.Frame.IsLast && closed {
fmt.Println("Trying to close channel twice")
skippedFrames = append(skippedFrames, frame)
continue
}
if _, ok := framesByNumber[frame.Frame.FrameNumber]; ok {
fmt.Println("Duplicate frame")
skippedFrames = append(skippedFrames, frame)
continue
}
if closed && frame.Frame.FrameNumber >= endFrameNumber {
fmt.Println("Frame number past the end of the channel")
skippedFrames = append(skippedFrames, frame)
continue
}
framesByNumber[frame.Frame.FrameNumber] = frame
if frame.Frame.IsLast {
endFrameNumber = frame.Frame.FrameNumber
closed = true
}
if frame.Frame.IsLast && endFrameNumber < highestFrameNumber {
// Do a linear scan over saved inputs instead of ranging over ID numbers
for id, prunedFrame := range framesByNumber {
if id >= endFrameNumber {
skippedFrames = append(skippedFrames, prunedFrame)
}
}
highestFrameNumber = endFrameNumber
}
if frame.Frame.FrameNumber > highestFrameNumber {
highestFrameNumber = frame.Frame.FrameNumber
}
} }
ready := chReady(framesByNumber, closed, endFrameNumber)
if !ready { if !ready {
fmt.Printf("Found channel that was not closed: %v\n", id.String()) fmt.Printf("Found channel that was not closed: %v\n", id.String())
} }
return ChannelWithMeta{ return ChannelWithMeta{
ID: id, ID: id,
Frames: frames, Frames: frames,
SkippedFrames: nil, SkippedFrames: skippedFrames,
IsReady: ready, IsReady: ready,
InvalidFrames: len(skippedFrames) != 0,
}
}
func chReady(inputs map[uint16]FrameWithMetadata, closed bool, endFrameNumber uint16) bool {
if !closed {
return false
}
if len(inputs) != int(endFrameNumber)+1 {
return false
}
// Check for contiguous frames
for i := uint16(0); i <= endFrameNumber; i++ {
_, ok := inputs[i]
if !ok {
return false
}
} }
return true
} }
func transactionsToFrames(txns []fetch.TransactionWithMeta) []FrameWithMetadata { func transactionsToFrames(txns []fetch.TransactionWithMeta) []FrameWithMetadata {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment