forked from qitab/cl-protobufs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffers.lisp
883 lines (820 loc) · 43.2 KB
/
buffers.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
;;; Copyright 2012-2020 Google LLC
;;;
;;; Use of this source code is governed by an MIT-style
;;; license that can be found in the LICENSE file or at
;;; https://opensource.org/licenses/MIT.
(in-package #:cl-protobufs.implementation)
;;; This file provides a stream-like abstraction, a BUFFER, that Protobuf serialization
;;; logic can use to perform a one-pass traversal of the input object tree such that
;;; all variable-length pieces are properly length-prefixed but without having to
;;; precompute lengths. This differs from the C implementation of serialization,
;;; which (by default) requires a pre-pass to compute the lengths for all constituent
;;; variable-length pieces such as strings and sub-messages.
(eval-when (:compile-toplevel :load-toplevel :execute)
(defparameter $optimize-buffering *optimize-fast-unsafe*)) ; NOLINT
(deftype array-index ()
#+sbcl 'sb-int:index
#-sbcl `(integer 0 ,(1- array-total-size-limit)))
;; A BUFFER is a linked list of blocks (vectors) of unsigned-byte.
;; It can more-or-less be thought of as a string-output-stream that accepts
;; (UNSIGNED-BYTE n) as the element-type, instead of character, and which
;; allows replacement of previously written bytes. CONCATENATE-BLOCKS
;; is the analogous operation to GET-OUTPUT-STREAM-STRING. It produces a
;; single vector of all bytes that were written.
;; This structure has subtypes for 8-bit octets and 32-bit words.
(defstruct (buffer (:constructor nil))
;; The current block
(block nil :type (simple-array * 1))
;; Index into current block at which next element may be written.
;; The block is full when index is equal to (LENGTH BLOCK).
(index 0 :type (unsigned-byte 28))
;; The entire list of blocks
(chain nil :type cons)
;; The cons cell whose car is BLOCK. This slot acts primarily
;; to optimize nconc onto CHAIN. It is not necessarily the last
;; cons in CHAIN, but usually it is.
(next nil :type cons)
;; Zero-based absolute position of the first element of this block in
;; the overall output. Updated only when assigning a new BLOCK.
(%block-absolute-start 0 :type array-index))
(defmethod print-object ((self buffer) stream)
(print-unreadable-object (self stream :type t :identity t)))
;; BUFFER-SAP is a macro because it makes little sense to write a function
;; that returns a pointer to something that can go stale on you.
;; Otherwise any extraction of a SAP from the buffer would be reliable only
;; within the scope of a WITHOUT-GCING or WITH-PINNED-OBJECTS.
;; It would work as an inline function, but this forces the right behavior.
#+sbcl
(defmacro buffer-sap (buffer)
`(sb-sys:vector-sap (buffer-block ,buffer)))
(defun-inline buffer-block-capacity (buffer)
(declare (optimize (safety 0)))
(length (buffer-block buffer)))
(defun-inline buffer-absolute-position (buffer)
(i+ (buffer-%block-absolute-start buffer)
(buffer-index buffer)))
(defun make-buffer (constructor block)
(let ((chain (list block)))
(funcall (the function constructor) block chain chain)))
(deftype octet-type () '(unsigned-byte 8))
(deftype word-buffer-block-type () '(simple-array (unsigned-byte 32) 1))
(defstruct (word-buffer (:include buffer (block nil :type word-buffer-block-type))
(:constructor %make-word-buffer (block chain next))))
(defun make-word-buffer (size)
(declare (array-index size))
(make-buffer #'%make-word-buffer
(make-array size :element-type '(unsigned-byte 32))))
(defstruct (octet-buffer (:include buffer
(block nil :type (simple-array octet-type 1)))
(:constructor %make-octet-buffer (block chain next)))
;; The collection of backpatches is itself a word buffer
(backpatches (make-word-buffer 10))
;; When copying a fixed-size wire-level primitive that crosses a block boundary,
;; use the scratchpad first, then copy two subsequences of octets.
(scratchpad (make-array 8 :element-type '(unsigned-byte 8)))
(n-gap-bytes 0 :type fixnum)
(target nil) ; the destination of these octets, a STREAM typically
;; The BUFFER can also pretend to be stream by implementing CHAR-OUT
;; and STRING-OUT methods. The buffer and stream point to each other.
;; The stream is created only if needed. No support for non-SBCL Lisps.
#+sbcl
(stream nil :type (or null sb-kernel:ansi-stream))
;; The library does not use this slot, but applications may.
;; Because the structure type gets frozen (below) it is impolite/incorrect
;; to create subtypes of it having additional slots.
(userdata))
;; This declaration asserts that there wil not be further descendant types,
;; and promises to the compiler that TYPEP on the two buffer subtypes
;; need only be a simple EQ check.
#+sbcl
(declaim (sb-ext:freeze-type word-buffer octet-buffer))
(defun make-octet-buffer (size &key userdata target)
(declare (array-index size))
(let ((b (make-buffer #'%make-octet-buffer
(make-array size :element-type 'octet-type))))
(setf (octet-buffer-userdata b) userdata
(octet-buffer-target b) target)
b))
;; Allocate but do not link in a new block of at least MIN-SIZE, which can be zero
;; for the default growth rate of 1.5x the previous allocation.
;; A clever way to make an array of the right kind would be to use introspection
;; on the TYPE of the CURRENT-BLOCK slot. But clever = slow, so use ETYPECASE instead.
(defun new-block (buffer min-size)
(declare (array-index min-size))
;; For testing the algorithm without growth of buffers - to make it more likely that
;; data will span buffers - the new-capacity could be (max min-size 128) or similar.
;; It must never be smaller than the largest primitive type though.
(let* ((old-capacity (buffer-block-capacity buffer))
(new-capacity
(max min-size
(min (+ old-capacity (ash old-capacity 1)) 100000))))
(etypecase buffer
(word-buffer (make-array new-capacity :element-type '(unsigned-byte 32)))
(octet-buffer (make-array new-capacity :element-type 'octet-type)))))
;; After having ensured sufficient space, the "FAST-" output algorithms can avoid
;; allocating blocks, but might have to advance the block pointer with ADVANCE-BLOCK.
;; This gets called exponentially less often as block size is automatically grown,
;; so dot not benefit from being inlined.
;; Note that this DOES NOT set the 'current-index' slot to 0.
(declaim (ftype (function (buffer) (values (simple-array octet-type 1) &optional))
advance-block))
(defun advance-block (buffer)
(declare #.$optimize-buffering)
;; this INCF generates 6 instructions instead of 1. wth?
(incf (buffer-%block-absolute-start buffer)
(length (buffer-block buffer)))
(let ((tail (cdr (buffer-next buffer))))
(setf (buffer-next buffer) tail
(buffer-block buffer) (car tail))))
;; Create a new block such that there will be at least N bytes available in
;; total across the current and new block, given that BUFFER-ENSURE-SPACE [q.v.]
;; has already decided there is not presently enough space.
;; The new block's size is the greater of the defecit or the standard growth
;; amount. If there is zero space in the current block, the new block is set
;; as the current block, otherwise it is not.
;; Return true if all data will fit in the current block; NIL otherwise.
(declaim (ftype (function (t t) (values t &optional)) %buffer-ensure-space))
(defun %buffer-ensure-space (buffer n)
(declare ((and fixnum unsigned-byte) n) #.$optimize-buffering)
(let* ((capacity (buffer-block-capacity buffer))
(space-remaining (- capacity (buffer-index buffer)))
(defecit (the fixnum (- n space-remaining))))
;; There might already be a next-block. This can happen if previous write asked
;; for more space than existed in the current block, but subsequently didn't
;; use any space in the new block. That block can be smaller than what is
;; needed now, but don't drop it - push a new next-block in front.
(unless (and (cdr (buffer-next buffer))
(>= (length (the vector (second (buffer-next buffer)))) defecit))
(rplacd (buffer-next buffer)
(cons (new-block buffer defecit) (cdr (buffer-next buffer)))))
(when (zerop space-remaining)
(advance-block buffer)
;; 0 serves as a generalized T, meaining all N bytes fit in one block
(setf (buffer-index buffer) 0))))
;; Guarantee that BUFFER has room for at least N more elements (words or octets)
;; considering its current block and possibly one new block.
;; If all N elements fit into the current block, return true, else return NIL.
;; If exactly at the end of a block, the return value will be true because
;; the next block will contain all N bytes.
;; This inlined wrapper punts to the general case if available space is inadequate.
;;
(defun-inline buffer-ensure-space (buffer n)
(declare ((and fixnum unsigned-byte) n) #.$optimize-buffering)
(or (>= (- (buffer-block-capacity buffer) (buffer-index buffer)) n)
(%buffer-ensure-space buffer n)))
;; A SERIALIZED-PROTOBUF is the result of serializing in the one-pass algorithm
;; and then squashing out any of the gaps that were left by allocating length
;; prefixes in their largest possible size but not using all bytes.
;;
(defstruct (serialized-protobuf
(:constructor make-serialized-protobuf
(blocks total-length final-block-length)))
blocks
total-length
final-block-length)
(defmethod print-object ((self serialized-protobuf) stream)
(declare (stream stream))
(print-unreadable-object (self stream :type t)
(format stream "~D byte~:P" (serialized-protobuf-total-length self))))
(declaim (ftype (function (t t) (values t &optional))
word-out octet-out)
(inline word-out))
;; Define OCTET-OUT and WORD-OUT on the respective buffer types.
(macrolet
((define-emitter (name buffer-type element-type)
`(defun ,name (buffer val)
(declare (,buffer-type buffer) #.$optimize-buffering)
(let* ((block (buffer-block buffer))
(index (buffer-index buffer))
(capacity (length block)))
;; Structure's slot type isn't enough to provide type information
;; because of a later setq.
(declare ((simple-array ,element-type 1) block))
(when (>= index capacity)
(incf (buffer-%block-absolute-start buffer) capacity)
(setf block
;; see if space was pre-allocated
(cond ((cdr (buffer-next buffer))
(pop (buffer-next buffer))
(car (buffer-next buffer)))
(t
(let* ((next (new-block buffer 0))
(cell (list next)))
(setf (cdr (buffer-next buffer)) cell
(buffer-next buffer) cell)
next)))
(buffer-block buffer) block
index 0))
(setf (aref block index) val
(buffer-index buffer) (1+ index))))))
(define-emitter word-out word-buffer (unsigned-byte 32))
(define-emitter octet-out octet-buffer octet-type))
(defun %fast-octet-out (buffer val)
(let ((block (advance-block buffer)))
(setf (aref block 0) val
(buffer-index buffer) 1)))
;; Perform OCTET-OUT, but if the current block can hold no more,
;; assume existence of a pre-made next block.
(defun-inline fast-octet-out (buffer val)
(declare (octet-buffer buffer) #.$optimize-buffering)
(let* ((block (buffer-block buffer))
(index (buffer-index buffer)))
(declare ((simple-array octet-type 1) block))
(if (i< index (length block))
(setf (aref block index) val (buffer-index buffer) (1+ index))
(%fast-octet-out buffer val)))) ; punt
;; Rapidly copy all of OCTETS into BUFFER as if by FAST-OCTET-OUT.
;; Space must have been ensured so that at most one additional block beyond
;; the current-block is needed.
;;
(defun fast-octets-out (buffer octets
&aux (input-length (length octets)))
(declare (octet-buffer buffer) (optimize (safety 0))
((simple-array octet-type 1) octets)
((unsigned-byte 32) input-length))
(unless (zerop input-length)
(let* ((block (buffer-block buffer))
(index (buffer-index buffer))
(available-space (- (length block) index)))
(declare ((simple-array octet-type 1) block))
;; ENSURE-SPACE always leaves room for at least 1 octet in the current block,
;; and even if it left zero this code would still be correct.
(let ((n (min available-space input-length)))
(replace block octets :start1 index)
(incf index n)
(decf input-length n))
(when (plusp input-length)
;; There is more input. This can only happen if the block's
;; capacity was reached.
;; The starting index of the source of the copy is the number
;; of bytes that were already written into the first block.
(replace (advance-block buffer) octets
:start2 available-space)
;; The ending index in the current block is whatever was just
;; copied, since the starting index for writing was 0.
(setq index input-length))
(setf (buffer-index buffer) index))))
;; Bind ITER to an iterator over WORD-BUFFER in the manner of standard
;; WITH-{mumble}-ITERATOR macros. Each time ITER is invoked, the next
;; buffer element will be returned, or NIL if no more remain.
(defmacro with-word-buffer-iterator ((iterator-name word-buffer) &body body)
(with-gensyms (buffer block more-blocks input-pointer input-limit)
`(let* ((,buffer, word-buffer)
(,block ,(coerce #() 'word-buffer-block-type))
;; if the current block's index is 0, then no blocks were used at all
(,more-blocks (unless (zerop (buffer-index ,buffer))
(buffer-chain ,buffer)))
(,input-pointer 0)
(,input-limit 0))
(declare (word-buffer-block-type ,block)
(array-index ,input-pointer ,input-limit))
(macrolet
((,iterator-name ()
`(locally
(declare (optimize (safety 0)))
(when (or (i< ,',input-pointer ,',input-limit)
(when ,',more-blocks
(setq ,',block (pop ,',more-blocks)
,',input-limit
(if ,',more-blocks
(length ,',block)
(buffer-index ,',buffer))
,',input-pointer 0)))
(aref ,',block (prog1 ,',input-pointer (incf ,',input-pointer)))))))
,@body))))
;; Put blank space into an octet buffer so that later we can go back and
;; patch a length-prefix in.
;; Return fives values: absolute stream position, the cons cell pointing
;; to the block in which the first octet would be written, and the index to
;; that octet, and a pointer to the block in the buffer of deletions that
;; will be performed on finalization, and a pointer into that block.
;; Multiple values avoid consing anything to represent saved buffer locations.
(declaim (ftype (function (t) (values t t t t t &optional))
emit-placeholder))
(defun emit-placeholder (buffer)
(declare #.$optimize-buffering)
;; ABS-POS doesn't change even if BUFFER-ENSURE-SPACE advances a block
;; so the first two bindings are actually order-insensitive,
;; but the capturing of BUFFER-NEXT must occur after ENSURE-SPACE.
;; A length-prefix placeholder reserves 4 octets which is enough to represent
;; a 28-bit integer (the other bit of each octet being the "more-to-go" flag).
;; Given the suggested message size limit of a few megabytes, this is fine.
(symbol-macrolet ((reserve-bytes 4))
(let ((within-block-p (buffer-ensure-space buffer reserve-bytes))
(abs-pos (buffer-absolute-position buffer))
(blocks (buffer-next buffer))
(index (buffer-index buffer)))
(setf (buffer-index buffer)
(if within-block-p
(+ index reserve-bytes)
(let ((available-space (- (buffer-block-capacity buffer) index)))
(advance-block buffer)
(- reserve-bytes available-space))))
;; A place is reserved in the deletion buffer to hold a pointer to
;; the place in the octet buffer that will probably be squeezed out.
;; This is done now, so that indices stored are monotonic.
;; Were that not done, and backpatching recorded deletion markers
;; only at the time of making the patch, the deletion markers would
;; not be in ascending order - they would have a "treelike" appearance
;; based on the order in which submessages were completed.
(let ((patch-buffer (octet-buffer-backpatches buffer)))
(word-out patch-buffer 0)
(values abs-pos blocks index
(buffer-block patch-buffer)
(1- (buffer-index patch-buffer)))))))
;; Patch VAL into the octet buffer by changing the contents of VAL's block at
;; the specified indices using 'varint' encoding, and also record a pointer
;; to the range of octets which were reserved for VAL but not consumed by it.
;; Return the number of bytes used to store VAL.
(declaim (ftype (function (t t t t t t t) (values fixnum &optional))
backpatch-varint))
(defun backpatch-varint (val buffer abs-pos blocks index pointer-block pointer-index)
(declare #.$optimize-buffering)
(declare (type (unsigned-byte 32) val)
((simple-array (unsigned-byte 32) 1) pointer-block)
(array-index index pointer-index))
(let* ((block (first blocks)) (limit (length block)) (count 0))
(declare ((simple-array octet-type 1) block) (fixnum count))
;; Seven bits at a time, least significant bits first
(loop do (let ((bits (ildb (byte 7 0) val)))
(declare (octet-type bits))
(setq val (iash val -7))
(when (>= index limit)
;; This doesn't bother updating LIMIT to its "proper" new value.
;; It can't possibly be any smaller than a varint.
(setf index 0 block (second blocks)))
(setf (aref block index) (ilogior bits (if (i= val 0) 0 128)))
(iincf index)
(incf count))
until (i= val 0))
;; Record the location of the backpatch so that the unused bytes can be
;; squashed out later. This is done even if all 4 bytes were used,
;; because a place was aleady reserved in the word-buffer for this backpatch.
(cond ((<= count 4)
;; Encode the deletion using 2 bits for the deletion count (0 .. 3)
;; ORed with the index at which to delete shifted left 2 bits.
(let ((gap (i- 4 count)))
(setf (aref pointer-block pointer-index)
(ilogior (ash (i+ abs-pos count) 2) gap))
(incf (octet-buffer-n-gap-bytes buffer) gap)))
((> count 4)
(protobuf-error "Backpatch failure on ~S" buffer)))
count))
;; Execute BODY, capturing the state of BUFFER at the start, and *unless* a nonlocal
;; exit occurs, restore the state of the buffer prior to executing the body
;; and return no value.
(defmacro with-bookmark ((buffer) &body body)
(with-gensyms (block index next abs-pos)
`(let ((,block (buffer-block ,buffer))
(,index (buffer-index ,buffer))
(,next (buffer-next ,buffer))
(,abs-pos (buffer-%block-absolute-start ,buffer)))
,@body
(setf (buffer-block ,buffer) ,block
(buffer-index ,buffer) ,index
(buffer-next ,buffer) ,next
(buffer-%block-absolute-start ,buffer) ,abs-pos)
(values))))
;; Reserve space for a uint32 prior to the start of a variable-length subsequence
;; of buffer, and also reserve space in the backpatch buffer to point to the space
;; in the data buffer where unused reserved bytes should be squashed out.
(defmacro with-placeholder ((buffer &key position) &body body)
(let* ((name "PLACEHOLDER")
(abs
(or position
(make-symbol (concatenate 'string name "-OCTET-POSITION"))))
(blocks (make-symbol (concatenate 'string name "-OCTET-BLOCKS")))
(index (make-symbol (concatenate 'string name "-OCTET-INDEX")))
(pointer-block (make-symbol (concatenate 'string name "-POINTER-BLOCK")))
(pointer-index (make-symbol (concatenate 'string name "-POINTER-INDEX"))))
`(multiple-value-bind (,abs ,blocks ,index ,pointer-block ,pointer-index)
(emit-placeholder ,buffer)
(macrolet ((backpatch (value)
`(backpatch-varint ,value
,',buffer ,',abs ,',blocks ,',index
,',pointer-block ,',pointer-index)))
,@body))))
;; A simple wrapper on REPLACE. This function is used only in one place.
;; It shouldn't be needed, but small copies using REPLACE are slower than a loop.
;; It turns out that a foreign call to memmove would be faster for 80 bytes or more.
(defun-inline fast-replace (destination destination-index
source source-index count)
(declare (array-index destination-index count)
((simple-array octet-type 1) destination source))
(let ((limit (the array-index (+ destination-index count))))
(if (< count 40)
(loop (setf (aref destination destination-index) (aref source source-index))
(incf source-index)
(when (eql (incf destination-index) limit) (return)))
(replace destination source
:start1 destination-index :end1 limit
:start2 source-index))))
(defvar **empty-word-buffer** (make-word-buffer 0))
;; Given an octet-buffer BUFFER, squeeze out any octets which "do not exist" in
;; the virtual octet sequence so they no also longer exist in the physical sequence.
;; After this operation, BUFFER will be ready for direct consumption, such as
;; by a client or a compression algorithm or file storage.
(defun compactify-blocks (buffer)
(declare #.$optimize-buffering)
;; OUTPUT and INPUT refer to the same block chain, namely the blocks
;; that currently exist in BUFFER.
(let* ((input-block-chain (buffer-chain buffer))
(output-block-chain input-block-chain)
;; Output blocks are not popped off the chain until
;; advancing beyond the current block. This way the tail
;; can be smashed to NIL when reaching the end of input.
(output-block (car output-block-chain))
(output-index 0)
;; Setting INPUT-BLOCK now is only for type-correctness of the
;; initial value. It will be set again immediately before reading
(input-block (car input-block-chain))
(input-index 0) ; block-relative index
(input-position 0) ; absolute
(deletion-point 0)
(deletion-length 0))
(declare ((simple-array octet-type 1) output-block input-block)
(array-index output-index input-index input-position))
;; Drop any pre-allocated but unused block in the input chain.
(when (cdr (buffer-next buffer))
(assert (eq (buffer-block buffer) (car (buffer-next buffer))))
(rplacd (buffer-next buffer) nil))
;; The reason for deferring this POP 'til after the preceding "drop"
;; is that if there were exactly two input blocks, one used and one not
;; used at all, INPUT-BLOCK-CHAIN should become NIL.
(setq input-block (pop input-block-chain))
(with-word-buffer-iterator
(deletion-point-getter (octet-buffer-backpatches buffer))
(labels
((find-next-deletion-point ()
;; If the deletion point is one at which no bytes should be deleted -
;; probably impossible as it means a submessage length took >21 bits
;; (= 4 bytes) to encode - skip until finding somewhere to delete,
;; or else finding that there are no further deletion points.
(let ((word (deletion-point-getter)))
(if (not word)
(setq deletion-point most-positive-fixnum deletion-length 0)
(let ((n-bytes (logand (the fixnum word) #b11)))
(if (zerop n-bytes)
(find-next-deletion-point)
(setq deletion-point (ash word -2)
deletion-length n-bytes))))))
(next-output-block ()
(setq output-block-chain (cdr output-block-chain)
output-block (car output-block-chain)
output-index 0)
(length output-block))
(copy-to-output (count)
(declare ((and fixnum unsigned-byte) count))
(when (zerop count)
(return-from copy-to-output))
(let ((space-available (- (length output-block) output-index)))
(declare (array-index count space-available))
;; See if the output needs to be advanced to the next block.
(when (zerop space-available)
(setq space-available (next-output-block)))
;; Avoid copying until the earlist point at which bytes need to move.
;; This rapidly skips over blocks that contain only fixed-length data
;; provided they are the first blocks in the serialized output.
;; Not likely, but happens.
(when (and (eq output-block input-block)
(eql output-index input-index))
(incf output-index count)
(incf input-index count)
(return-from copy-to-output))
;; A chunk of input can span more than one block of output due to
;; variable-length blocks.
(loop
(let ((stride (min count space-available)))
;; COUNT and SPACE-AVAILABLE are both positive,
;; so this will copy at least one octet.
(fast-replace output-block output-index
input-block input-index stride)
(incf output-index stride)
(incf input-index stride)
(if (eql (decf count stride) 0) (return)))
(when (zerop (setq space-available
(- (length output-block) output-index)))
(setq space-available (next-output-block))))))
(compute-input-block-length ()
;; Only the final block is possibly shorter than its allocated length.
;; The others are as long as allocated, each larger than its predecessor.
(if input-block-chain
(length input-block)
(buffer-index buffer))))
(declare (inline next-output-block compute-input-block-length))
(prog ((block-length (compute-input-block-length))
(total-deletion-count 0))
(declare (array-index block-length total-deletion-count))
tippytop
(find-next-deletion-point)
top
(let* ((remaining-length (- block-length input-index))
(n-bytes-to-copy
(min remaining-length (- deletion-point input-position))))
(copy-to-output n-bytes-to-copy)
(incf input-position n-bytes-to-copy)) ; absolute
(when (eql input-index block-length)
(unless input-block-chain
(rplacd output-block-chain nil) ; terminate the list
;; Free the unnecessary word-buffer blocks. Also makes additional calls
;; to COMPACTIFY on this buffer do nothing, which seems reasonable.
(setf (octet-buffer-backpatches buffer) **empty-word-buffer**)
(return (make-serialized-protobuf
(buffer-chain buffer)
(- input-position total-deletion-count)
output-index)))
(setq input-block (pop input-block-chain)
block-length (compute-input-block-length)
input-index 0)
(go top))
;; now we must be at a deletion point
(unless (and (= input-position deletion-point) (plusp deletion-length))
(protobuf-error "Octet buffer compaction bug"))
(let ((remaining-length (- block-length input-index)))
(if (>= remaining-length deletion-length)
(incf input-index deletion-length) ; easy case
;; Skip remainder of this block and start of one more. Deleted ranges
;; never span more than 2 blocks since deletion-length <= 3
;; and blocks are much larger than 3 octets.
(setq input-block (pop input-block-chain)
block-length (compute-input-block-length)
input-index (- deletion-length remaining-length))))
(incf input-position deletion-length)
(incf total-deletion-count deletion-length)
(go tippytop))))))
(defun reset-buffer-chain (buffer chain)
"Make BUFFER have CHAIN as its list of octet arrays"
(setf (buffer-block buffer) (car chain)
(buffer-index buffer) 0
(buffer-chain buffer) chain
(buffer-next buffer) chain
(buffer-%block-absolute-start buffer) 0)
;; Zero-fill, or not. This should depend on SAFETY and/or DEBUG,
;; but there is no way to discover the current policy
;; without using implementation-specific code.
#+nil
(dolist (block chain)
(fill block 0)))
(defun force-to-stream (buffer)
"Write the octets currently in BUFFER to its target stream,
and rewind BUFFER so that it is empty."
;; Before COMPACTIFY-BLOCKS messes up the chain, copy it.
;; Then compactify and copy to the target stream.
(let ((chain (copy-list (buffer-chain buffer)))
(backpatch-chain (buffer-chain (octet-buffer-backpatches buffer)))
(stream (the stream (octet-buffer-target buffer))))
(flet ((out-block (block length)
(write-sequence block stream :start 0 :end length)))
(declare (dynamic-extent #'out-block))
(call-with-each-block #'out-block (compactify-blocks buffer)))
(reset-buffer-chain buffer chain)
(setf (octet-buffer-n-gap-bytes buffer) 0)
;; Heuristically resize the backpatch buffer, trying to avoid subsequent expansion
;; Ideally we would do this only only on the *next* attempted use of the buffer,
;; but that's not as easy as just sizing up now, even if no further write will occur.
;; The worst-case is when the backpatch buffer is never needed again,
;; but was nonetheless resized to be larger. But that's probably not common.
(let ((backpatches (octet-buffer-backpatches buffer)))
(reset-buffer-chain
backpatches
(if (cdr backpatch-chain)
(list (new-block backpatches
(loop for block in backpatch-chain
sum (length block))))
backpatch-chain)))))
;; Given either a SERIALIZED-PROTOBUF or a BUFFER, return the concatenation
;; of all BLOCKS. You probably don't want to do this on an uncompacted BUFFER.
;; That usually makes no sense in any scenario other than debugging.
(defun concatenate-blocks (buffer)
(multiple-value-bind (total-length blocks)
(etypecase buffer
(serialized-protobuf
(values (serialized-protobuf-total-length buffer)
(serialized-protobuf-blocks buffer)))
(buffer
(values (loop for (block . rest) on (buffer-chain buffer)
sum (if rest (length (the (simple-array * 1) block))
(buffer-index buffer))
fixnum)
(buffer-chain buffer))))
(declare (array-index total-length))
(let ((result (make-array total-length :element-type 'octet-type))
(index 0))
(declare (array-index index))
(dolist (block blocks result)
(replace result (the (simple-array octet-type 1) block) :start1 index)
(incf index (length (the (simple-array * 1) block)))))))
;; Given a BUFFER or a SERIALIZED-PROTOBUF, call FUNCTION once with each
;; block, passing it also the effective length of the block.
(defun call-with-each-block (function buffer)
(etypecase buffer
(serialized-protobuf
(let ((blocks (serialized-protobuf-blocks buffer)))
(loop
(let ((block (car blocks)))
(funcall function block
(if (cdr blocks)
(length (the (simple-array * 1) block))
(serialized-protobuf-final-block-length buffer))))
(pop blocks)
(if (null blocks) (return)))))
(buffer
(let ((blocks (buffer-chain buffer)))
(loop
(let ((block (car blocks)))
(funcall function block
(if (cdr blocks)
(length (the (simple-array * 1) block))
(buffer-index buffer))))
(pop blocks)
(if (null blocks) (return)))))))
;;;
#+sbcl
(declaim (sb-ext:maybe-inline encode-uint32))
(macrolet ((define-varint-encoder (name reserve-bytes lisp-type
&optional (expr 'input))
`(progn
(declaim (ftype (function (,lisp-type buffer)
(values (integer 1 ,(or reserve-bytes 5)) &optional))
,name))
(defun ,name (input buffer &aux (val ,expr))
(declare (type ,lisp-type input)
(type (unsigned-byte ,(second lisp-type)) val))
;; The locally declare gives us optimizations inside the locally
;; but leaves the typechecking in the function.
(locally
(declare #.$optimize-buffering)
,@(when reserve-bytes
`((buffer-ensure-space buffer ,reserve-bytes)))
(let ((n 0))
(declare (fixnum n))
(loop (let ((bits (ldb (byte 7 0) val)))
(setq val (ash val -7))
(fast-octet-out buffer
(ilogior bits (if (i= val 0) 0 128)))
(iincf n))
(when (eql val 0) (return n)))))))))
(define-varint-encoder encode-uint32 5 (unsigned-byte 32))
(define-varint-encoder encode-uint64 10 (unsigned-byte 64))
;; It is best to keep all occurrences of (LDB (BYTE 64 0) ...) out of calling code
;; because that forces boxing in many cases, and even it if doesn't create a new bignum,
;; it causes generic arithmetic routines to be used.
;; Hiding the LDB operation inside a primitive encoder is better for efficiency.
(define-varint-encoder encode-int64 10 (signed-byte 64)
;; On SBCL the LOGAND compiles to nothing.
#+sbcl (logand input sb-vm::most-positive-word)
#-sbcl (ldb (byte 64 0) input))
;; FAST-ENCODE simply omits the call to ENSURE-SPACE and might not be worth keeping
(define-varint-encoder fast-encode-uint32 nil (unsigned-byte 32)))
(define-compiler-macro encode-uint32 (&whole form val buffer)
(let (encoded-length)
(if (and (typep val 'fixnum) (i<= (setq encoded-length (length32 val)) 2))
(let ((low7 (logand val #x7F)))
(case encoded-length
(1 `(progn (octet-out ,buffer ,low7)
1))
(2 `(progn (octet-out2 ,buffer ,(logior #x80 low7) ,(ldb (byte 7 7) val))
2))))
form)))
;; For encoding an object tag + wire-type, we can compile-time convert ENCODE-UINT32
;; into a few OCTET-OUT calls. I'll only do this for 1 and 2-octet writes though,
;; which is enough for field-indices up to (2^14)-1.
(defun octet-out2 (buffer first second)
(octet-out buffer first)
(octet-out buffer second))
;;;
;; A BUFFER does not, in general, interact through a stream interface
;; (WRITE-BYTE, WRITE-SEQUENCE) however there is some support in SBCL
;; for treating it as though it were a character output stream.
;; In general it is faster to use OCTET-OUT, however a stream produces
;; less garbage if the alternative would be to call WRITE-TO-STRING on
;; something and serialize the resultant string. The buffer can do this
;; for you as long as you only write ASCII characters, because the
;; stream mode does not have a UTF-8 encoder. (It could, but doesn't)
#+sbcl
(progn
(defstruct (octet-output-stream
(:conc-name octet-stream-)
;; Maybe Todo: supply a BOUT (byte-out) handler function.
(:include sb-kernel:ansi-stream
(out #'octet-stream-char-out)
(sout #'octet-stream-string-out))
(:constructor make-octet-output-stream (buffer)))
;; How many characters should the character producer be permitted to write
;; before we complain about a protocol error.
(space-available 0 :type fixnum)
(buffer nil :type octet-buffer))
(defun protocol-error (stream)
(protobuf-error "Octet stream protocol error on ~S" stream))
(defun octet-stream-char-out (stream character)
;; A streamified BUFFER accept only ASCII characters (for now).
;; This is more of a sanity-check than a limitation, and it's a mild
;; limitation if that- the ENCODE-STRING protobuf serializer performs
;; encoding and doesn't use its BUFFER as a stream. It uses OCTETS-OUT.
(unless (<= (char-code character) 127)
(protocol-error stream))
(octet-out (octet-stream-buffer stream) (char-code character)))
(defun octet-stream-limited-char-out (stream character)
(cond ((or (zerop (octet-stream-space-available stream))
(> (char-code character) 127))
(protocol-error stream))
(t
(decf (octet-stream-space-available stream))
(octet-out (octet-stream-buffer stream) (char-code character)))))
(defun octet-stream-string-out (stream string start end)
(declare (string string) (array-index start end))
(let ((f (sb-kernel:ansi-stream-out stream)))
(sb-kernel:with-array-data ((string string) (start start) (end end))
(loop for i fixnum from start below end
do (funcall f stream (char string i))))))
(defun %get-buffer-stream (buffer)
(or (octet-buffer-stream buffer)
(setf (octet-buffer-stream buffer) (make-octet-output-stream buffer))))
(declaim (ftype (function (buffer) (values stream &optional))
get-unlimited-buffer-stream get-tiny-buffer-stream)
(ftype (function (buffer fixnum) (values stream &optional))
get-bounded-buffer-stream))
;; Return a stream that accepts any number of characters.
;; A placeholder must already have been reserved for the length prefix.
(defun get-unlimited-buffer-stream (buffer)
(let ((stream (%get-buffer-stream buffer)))
;; Setting the space to 0 ensures we can't call the 'limited'
;; char out function without getting an obvious failure.
(setf (octet-stream-space-available stream) 0
(sb-kernel:ansi-stream-out stream) #'octet-stream-char-out)
stream))
;; Return a stream that accepts a tiny string. 1 byte is reserved for the length.
(defun get-tiny-buffer-stream (buffer)
(buffer-ensure-space buffer 128) ; 1 byte prefix, <= 127 string characters
(fast-octet-out buffer 0) ; easy way to leave a 1-byte space
(let ((stream (%get-buffer-stream buffer)))
(setf (octet-stream-space-available stream) 127
(sb-kernel:ansi-stream-out stream) #'octet-stream-limited-char-out)
stream))
;; Return a stream that accepts a known-length string. The length gets encoded first.
(defun get-bounded-buffer-stream (buffer n-chars)
(encode-uint32 n-chars buffer) ; emit the variable-length length prefix
(let ((stream (%get-buffer-stream buffer)))
(setf (octet-stream-space-available stream) n-chars
(sb-kernel:ansi-stream-out stream) #'octet-stream-limited-char-out)
stream))
;; WITH-BUFFER-AS-STREAM binds STREAM to a character output stream that when written to
;; places ASCII characters into BUFFER. There are three cases, listed here
;; in order from most efficient to least efficient:
;; 1. (WITH-BUFFER-AS-STREAM (stream buffer :length n)
;; Length specified as an integer N (evaluated at runtime) will encode a prefix of N
;; then accept N characters. Writing anything other than exactly N will signal an eror.
;; 2. (WITH-BUFFER-AS-STREAM (stream buffer :length :TINY)
;; Length specified as the literal symbol :TINY will leave a 1-byte gap for a prefix.
;; (... :length N) where N runtime evaluates to the keyword :TINY is not legal.
;; Between 0 and 127 characters may be written, and the prefix will be modified accordingly.
;; An error will be signaled if more than 127 characters are written.
;; 3. (WITH-BUFFER-AS-STREAM (stream buffer) ...)
;; No length specified will leave a 4-byte placeholder for an arbitrary length and
;; backpatch it in. This relies on buffer compactification in the same way as does
;; writing of an unknown-length submessage.
;; In all cases, non-ASCII characters are rejected.
;; If TAG is supplied, it is encoded prior to the encoding of the string data.
;; This macro should be used for effect, not value - its return value is undefined.
(defmacro with-buffer-as-stream ((stream-var buffer &key length (tag nil tag-p))
&body body &environment env)
(with-gensyms (start-pos start-block start-index)
`(progn
,@(if tag-p `((encode-uint32 ,tag ,buffer)))
,(cond ((not length) ; most general
`(with-placeholder (,buffer :position ,start-pos)
(let ((,stream-var (get-unlimited-buffer-stream ,buffer)))
,@body)
(backpatch
(i- (buffer-absolute-position ,buffer)
;; Buffer's absolute pos was marked at the first octet of the
;; placeholder for the varint.
;; Actual number of chars written is 4 less than that.
,start-pos 4))))
((eq length :tiny)
`(let ((,stream-var (get-tiny-buffer-stream ,buffer))
(,start-block (octet-buffer-block ,buffer))
(,start-index (1- (buffer-index ,buffer))))
,@body
(locally
,@(when (sb-c:policy env (= safety 0))
`((declare (optimize (sb-c::insert-array-bounds-checks 0)))))
(setf (aref ,start-block ,start-index)
(i- 127 (octet-stream-space-available ,stream-var))))))
(t
`(let ((,stream-var (get-bounded-buffer-stream ,buffer ,length)))
,@body
,@(when (sb-c:policy env (> safety 0))
;; The stream will croak upon trying to write >LENGTH chars.
;; With safety, ensure *exactly* that many were written.
`((unless (zerop (octet-stream-space-available ,stream-var))
(protocol-error ,stream-var))))))))))
) ; end of #+sbcl (PROGN ...)
;; The portable implementation of WITH-BUFFER-AS-STREAM
#-sbcl
(defmacro with-buffer-as-stream ((stream-var buffer &key length) &body body)
(declare (ignore length))
`(let ((,stream-var (make-string-output-stream)))
,@body
(encode-string (get-output-stream-string ,stream-var)
,buffer)))