{"id":330,"date":"2023-06-14T22:31:33","date_gmt":"2023-06-15T02:31:33","guid":{"rendered":"https:\/\/www.schveiguy.com\/blog\/?p=330"},"modified":"2023-06-14T22:38:16","modified_gmt":"2023-06-15T02:38:16","slug":"new-iopipe-abstraction-segmentedpipe","status":"publish","type":"post","link":"https:\/\/www.schveiguy.com\/blog\/2023\/06\/new-iopipe-abstraction-segmentedpipe\/","title":{"rendered":"New iopipe Abstraction &#8211; SegmentedPipe"},"content":{"rendered":"\n<p>For those who know my library <a href=\"https:\/\/github.com\/schveiguy\/iopipe\">iopipe<\/a>, it has been pretty stagnant as of late. However, the other day, I needed to process a file line-by-line, along with N lines of context. Now, <a href=\"https:\/\/github.com\/schveiguy\/iopipe\/blob\/master\/examples\/search\/search.d\">I do have an example<\/a> in the iopipe library that shows how to do N lines of context, but it does this via a separately maintained list of line references. Such a thing is a pain to keep track of, and ironically works just like a buffer in iopipe does.<\/p>\n\n\n\n<p>So I thought, &#8220;what if I make an iopipe which is a pipe of lines, where each element is another line from the source pipe?&#8221; Element 0 is the first line in the buffer, element 1 is the next one, etc.<\/p>\n\n\n\n<p>What needs to be stored for this? If I store slices of the underlying window, those can change when data is released, which means reconstructing everything upon <code>release<\/code> (not ideal). If I store offsets into the source window, those also can change, but it&#8217;s probably more manageable. Instead of re-constructing all the lines, I can subtract the number of bytes removed from the source chain from each element. Each position in my &#8220;offset buffer&#8221; is a number from 0 to source.window.size, and will be in increasing order. Then when I fetch an &#8220;element&#8221; from the pipe, it will slice the data out of <code>source.window<\/code> based on these endpoints.<\/p>\n\n\n\n<p>But that still means extra work on <code>release<\/code>, and it also invalidates any windows stored elsewhere (some of this can&#8217;t be helped). However, there&#8217;s a simple solution to this: if the first offset in the list is treated as the &#8220;origin&#8221;, then we can slice based on that being 0! As a bonus, it also tells us the position within the entire stream (since starting the line pipe) for each line.<\/p>\n\n\n\n<p>So I went about writing this, and I couldn&#8217;t believe how simple it was! I can copy it all here, since it&#8217;s pretty short (I&#8217;m using shortened methods here to save some space):<\/p>\n\n\n\n<pre class=\"wp-block-prismatic-blocks\"><code class=\"language-d\">struct SegmentedPipe(SourceChain, Allocator = GCNoPointerAllocator) {\n    private {\n        SourceChain source;\n        AllocatedBuffer!(size_t, Allocator, 16) buffer;\n    }\n\n    private this(SourceChain source) {\n        this.source = source;\n        auto nelems = buffer.extend(1);\n        assert(nelems == 1);\n        buffer.window[0] = 0; \/\/ initialize with an offset of 0.\n    }\n\n    mixin implementValve!source;\n\n    \/\/ the &quot;range&quot;\n    static struct Window {\n        private {\n            SegmentedPipe *owner;\n            size_t[] offsets; \/\/ all the offsets of each of the segments\n        }\n\n        \/\/ standard random-access-range fare\n        auto front() =&gt; this[0];\n        auto back() =&gt; this[$-1];\n        bool empty() =&gt; offsets.length &lt; 2; \/\/ needs at least 2 offsets to properly slice\n        void popFront() =&gt; offsets.popFront;\n        void popBack() =&gt; offsets.popBack;\n        size_t length() =&gt; offsets.length - 1;\n        alias opDollar = length;\n\n        auto opIndex(size_t idx) {\n            immutable base = owner.buffer.window[0]; \/\/ first offset is always the front\n            return owner.source.window[offsets[idx] - base .. offsets[idx + 1] - base];\n        }\n    }\n\n    Window window() =&gt; Window(&amp;this, buffer.window);\n\n    size_t extend(size_t elements) {\n        \/\/ ensure we can get a new element\n        if(buffer.extend(1) == 0)\n            return 0; \/\/ can&#039;t get any more buffer space!\n        \/\/ always going to extend the source chain with 0, and give us a new segment\n        auto baseElems = source.extend(0);\n        if(baseElems == 0) {\n            \/\/ no new data\n            buffer.releaseBack(1);\n            return 0;\n        }\n        buffer.window[$-1] = buffer.window[$-2] + baseElems;\n        return 1;\n    }\n\n    void release(size_t elements) {\n        source.release(buffer.window[elements] - buffer.window[0]);\n        buffer.releaseFront(elements);\n    }\n}\n\n\/\/ factory\nauto segmentedPipe(Chain, Allocator = GCNoPointerAllocator)(Chain base) {\n    return SegmentedPipe!(Chain, Allocator)(base);\n}<\/code><\/pre>\n\n\n\n<p>For those not familiar with iopipe, the eponymous concept is similar to a range, but is essentially a sliding window of elements. <code>extend<\/code> gets more elements, <code>window<\/code> gives the current elements (as a random access range), and <code>release<\/code> forgets the front N elements from the window. In this way, you can completely control the buffer, and don&#8217;t have to allocate your own buffer for things.<\/p>\n\n\n\n<p>You might notice the comment &#8220;needs 2 elements&#8221;, that&#8217;s because we always need 2 offsets to slice an element. Now, I could special case e.g. the last element so I don&#8217;t have to store that one, but the code is so much nicer with a sentinel instead.<\/p>\n\n\n\n<p>So how do we use it to get lines? What we need is an iopipe that extends one line at a time. That&#8217;s exactly what <code>iopipe.textpipe.byLine<\/code> does. The code looks like this:<\/p>\n\n\n\n<pre class=\"wp-block-prismatic-blocks\"><code class=\"language-d\">        auto lines = File(filename, mode!&quot;r&quot;).refCounted\n            .bufd \/\/ buffered\n            .assumeText \/\/ assume it&#039;s utf8\n            .byLine \/\/ extend one line at a time\n            .segmentedPipe; \/\/ store lines in a buffer<\/code><\/pre>\n\n\n\n<p>And I was kind of shocked when this built and worked the first time. You know an abstraction is good when it writes easy, reads easy, everything is a simple composition of existing API, and it just works!<\/p>\n\n\n\n<p>Expect this to be in iopipe soon. I want some more features here, like I&#8217;d like to be able to get the offset from each element, and allow some way to store more information from the underlying pipe\/process. I think I might replace <a href=\"https:\/\/github.com\/schveiguy\/jsoniopipe\">jsoniopipe&#8217;s<\/a> <code>JsonTokenizer<\/code> with a <code>JsonTokenPipe<\/code>, and build things on top of that (i.e. validator, skip, etc). That actually would supersede the awkward cache system. Maybe I can get rid of the awkwardness of getting the string data too? One can only dream&#8230;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>For those who know my library iopipe, it has been pretty stagnant as of late. However, the other day, I needed to process a file line-by-line, along with N lines of context. Now, I do have an example in the iopipe library that shows how to do N lines of context, but it does this [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[6,2],"tags":[11,14],"class_list":["post-330","post","type-post","status-publish","format-standard","hentry","category-dlang","category-prog","tag-dlang","tag-iopipe"],"_links":{"self":[{"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/posts\/330"}],"collection":[{"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/comments?post=330"}],"version-history":[{"count":9,"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/posts\/330\/revisions"}],"predecessor-version":[{"id":339,"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/posts\/330\/revisions\/339"}],"wp:attachment":[{"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/media?parent=330"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/categories?post=330"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.schveiguy.com\/blog\/wp-json\/wp\/v2\/tags?post=330"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}