Skip to content

Commit

Permalink
Pretty printing the CE in Express using WASI
Browse files Browse the repository at this point in the history
  • Loading branch information
cardil committed Apr 18, 2023
1 parent bd18b84 commit 3c6191d
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 49 deletions.
11 changes: 11 additions & 0 deletions expressjs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions expressjs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"homepage": "https://github.com/openshift-knative/showcase#readme",
"dependencies": {
"@unleash/express-openapi": "^0.2.2",
"@wasmer/wasi": "^1.2.2",
"axios": "^0.25.0",
"cloudevents": "^6.0.4",
"dotenv": "^14.3.2",
Expand Down
24 changes: 16 additions & 8 deletions expressjs/scripts/extract-webjar.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ const chalk = require('chalk')
const path = require('path')

class Webjar {

/**
* Webjar stores the information about the webjar extraction.
*
*
* @param {Object} vars - The variables to use.
* @param {string} vars.group - The group of the webjar.
* @param {string} vars.artifact - The artifact of the webjar.
Expand Down Expand Up @@ -46,7 +47,7 @@ const webjars = [

/**
* Extracts all webjars.
*/
*/
async function extractWebjars() {
let ps = []
for (const webjar of webjars) {
Expand All @@ -55,11 +56,17 @@ async function extractWebjars() {
return await Promise.all(ps)
}

/**
* @callback Log
* @param {string} message
* @param {...any} args
*/

/**
* Creates a log function for the given webjar.
*
* @param {Webjar} webjar
* @returns {(message: string, ...args: any[]) => void}
*
* @param {Webjar} webjar
* @returns {Log}
*/
function createLog(webjar) {
return (...message) => {
Expand All @@ -69,8 +76,8 @@ function createLog(webjar) {

/**
* Extracts the webjar to the target directory.
*
* @param {Webjar} webjar
*
* @param {Webjar} webjar
*/
async function extractWebjar(webjar) {
const log = createLog(webjar)
Expand All @@ -88,7 +95,8 @@ async function extractWebjar(webjar) {
zipEntries.forEach(async zipEntry => {
// outputs zip entries information
if (zipEntry.entryName.startsWith(webjar.source) && !zipEntry.isDirectory) {
const targetPath = zipEntry.entryName.replace(webjar.source, webjar.target)
const targetPath = zipEntry.entryName
.replace(webjar.source, webjar.target)
log(`${chalk.yellow(zipEntry.entryName)} -> ${chalk.green(targetPath)}`)

const targetDir = path.dirname(targetPath)
Expand Down
8 changes: 4 additions & 4 deletions expressjs/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const routes = {
events: require('./routes/events/endpoint'),
}

const loaders = app => {
const loaders = async app => {
// Middleware Functions
middleware.logging(app)
middleware.public(app)
Expand All @@ -30,16 +30,16 @@ const loaders = app => {
routes.home(app)
routes.info(app)
routes.hello(app)
routes.events(app)
await routes.events(app)
}

const createApp = () => {
const createApp = async () => {
dotenv.config()

const ex = express()

// Start initializing
loaders(ex)
await loaders(ex)

return ex
}
Expand Down
106 changes: 81 additions & 25 deletions expressjs/src/routes/events/endpoint.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,90 @@
const { HTTP } = require('cloudevents')
const openapi = require('../../lib/openapi')
const EventStore = require('./store')
const { PrinterFactory } = require('./pretty-print')
const devdata = require('./devdata')

const store = new EventStore()
devdata.forEach(event => store.add(event))
const printerFactory = new PrinterFactory()

module.exports = async app => {
app.get('/events', streamDoc, (_, res) => {
res.set('Content-Type', 'text/event-stream')
res.set('Cache-Control', 'no-cache')
res.set('Connection', 'keep-alive')
res.set('X-SSE-Content-Type', 'application/cloudevents+json')
res.set('transfer-encoding', 'chunked')
res.flushHeaders()
/**
* @typedef {import('./pretty-print').Printer} Printer
* @typedef {import('express').Express} Express
* @typedef {import('express').Request} Request
* @typedef {import('express').Response} Response
*/

const stream = store.createStream(res)
stream.stream()
})
/**
* @type {Printer}
*/
let printer

app.post('/events', eventDoc, (req, res) => {
try {
const ce = HTTP.toEvent({ headers: req.headers, body: req.body })
ce.validate()
store.add(ce)
res.status(201).end()
} catch (err) {
console.error(err)
res.status(500)
res.json(err)
}
})
/**
* @type {EventStore}
*/
let store

/**
* Initializes the routes.
*
* @param {Express} app - the Express app
*/
async function events(app) {
printer = await printerFactory.create()

app.get('/events', streamDoc, stream)
app.post('/', eventDoc, recv)
app.post('/events', eventDoc, recv)

store = new EventStore()
devdata.forEach(event => recvEvent(event))
}

/**
* Streams all registered CloudEvents.
*
* @param {Request} _
* @param {Response} res - the HTTP response
*/
function stream(_, res) {
res.set('Content-Type', 'text/event-stream')
res.set('Cache-Control', 'no-cache')
res.set('Connection', 'keep-alive')
res.set('X-SSE-Content-Type', 'application/cloudevents+json')
res.set('transfer-encoding', 'chunked')
res.flushHeaders()

const str = store.createStream(res)
str.stream()
}

/**
* Receives a CloudEvent from an HTTP request.
* @param {Request} req - the HTTP request
* @param {Response} res - the HTTP response
* @returns {void}
*/
function recv(req, res) {
try {
const ce = HTTP.toEvent({ headers: req.headers, body: req.body })
recvEvent(ce)
res.status(201).end()
} catch (err) {
console.error(err)
res.status(500)
res.json(err)
}
}

/**
* Receives a CloudEvent, logs, and stores it.
*
* @param {CloudEvent} ce - the CloudEvent to receive
*/
function recvEvent(ce) {
ce.validate()
store.add(ce)
const out = printer.print(ce)
console.log('Received:\n', out)
}

const streamDoc = openapi.path({
Expand Down Expand Up @@ -97,3 +151,5 @@ const eventDoc = openapi.path({
}
}
})

module.exports = events
102 changes: 102 additions & 0 deletions expressjs/src/routes/events/pretty-print.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* global WebAssembly */

const { WASI, init } = require('@wasmer/wasi')
const { HTTP } = require('cloudevents')
const fs = require('fs').promises
const path = require('path')

class PrinterFactory {

/**
* Creates a new Printer instance.
*
* @returns {Promise<Printer>} - the new Printer instance
*/
async create() {
// Initialize WASI
await init()
const wasi = new WASI({
args: ['cloudevents-pretty-print.wasm'],
env: {},
})
const wasm = path.join(__dirname, '../../../build/wasm/cloudevents-pretty-print.wasm')
const buf = new Uint8Array(await fs.readFile(wasm))
const module = await WebAssembly.compile(buf)

// Instantiate the WASI module
const instance = await wasi.instantiate(module, {})

return new Printer({ instance })
}
}

class Printer {

/**
* Creates a new Printer instance.
*
* @param {Object} v - the params object
* @param {WebAssembly.Instance} v.instance - the WebAssembly instance
*/
constructor({ instance }) {
this.mem = instance.exports.memory
this.fn = instance.exports.pp_print
}

/**
* Prints a CloudEvent into a human readable text.
*
* @param {CloudEvent} ce - the CloudEvent to print
* @returns {string} - the human readable text
*/
print(ce) {
const message = HTTP.structured(ce).body

writeToMemory(message, this.mem)

const rc = this.fn(0)
if (rc !== 0) {
throw new Error(`pp_print() returned ${rc}`)
}

return readFromMemory(this.mem)
}
}

/**
* Writes a string into the shared memory as a CString.
*
* @param {string} message - the string to write
* @param {WebAssembly.Memory} mem - the shared memory
*/
function writeToMemory(message, mem) {
const enc = new TextEncoder()
const view = new Uint8Array(mem.buffer)
const state = enc.encodeInto(message, view)
view[state.written] = 0
}

/**
* Reads a CString from the shared memory.
*
* @param {WebAssembly.Memory} mem - the shared memory
* @returns {string} - the string read from the shared memory
*/
function readFromMemory(mem) {
const view = new Uint8Array(mem.buffer)
let messageBytes = []
for (let i = 0; i < view.length; i++) {
if (view[i] === 0) {
break
}
messageBytes.push(view[i])
}
const dec = new TextDecoder('utf-8')
const res = dec.decode(new Uint8Array(messageBytes))
return res
}

module.exports = {
Printer,
PrinterFactory
}
4 changes: 2 additions & 2 deletions expressjs/test/middleware/health.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ const { expect, describe, it } = require('@jest/globals')
describe('Route', () => {
const app = createApp()
it('GET /health/ready', async () => {
const res = await request(app)
const res = await request(await app)
.get('/health/ready')
expect(res.status).toBe(200)
expect(res.headers['content-type']).toMatch(/application\/json/)
expect(res.body).toEqual({ checks: [], status: 'UP' })
})

it('GET /health/live', async () => {
const res = await request(app)
const res = await request(await app)
.get('/health/live')
expect(res.status).toBe(200)
expect(res.headers['content-type']).toMatch(/application\/json/)
Expand Down
2 changes: 1 addition & 1 deletion expressjs/test/middleware/metrics.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { expect, describe, it } = require('@jest/globals')

describe('Route', () => {
it('GET /metrics', async () => {
const app = createApp()
const app = await createApp()
const res = await request(app)
.get('/metrics')
expect(res.status).toBe(200)
Expand Down
Loading

0 comments on commit 3c6191d

Please sign in to comment.