Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/1.0.0 #143

Merged
merged 25 commits into from
Aug 15, 2022
Merged

Release/1.0.0 #143

merged 25 commits into from
Aug 15, 2022

Conversation

colmsnowplow
Copy link
Collaborator

@colmsnowplow colmsnowplow commented Jun 14, 2022

v1 Release PR

Creating this now in order to record a 'snag list' of things we need to fix before release.

TODO:

pkg/telemetry/const.go Outdated Show resolved Hide resolved
go.mod Outdated Show resolved Hide resolved
@colmsnowplow
Copy link
Collaborator Author

Two commits don't have related issues, these need to be created and added before release.

@colmsnowplow
Copy link
Collaborator Author

I've just realised #140 should also have removed cmd/serverless.go.

@colmsnowplow
Copy link
Collaborator Author

colmsnowplow commented Jul 5, 2022

Authorship of
Add ability to configure TLS for Kafka and HTTP changed to me in the rebase - this isn't accurate as this work was entirely @TiganeteaRobert's work. In the rebase, I'll edit the authorship to correct that.

pkg/target/kafka.go Outdated Show resolved Hide resolved
pkg/target/http.go Outdated Show resolved Hide resolved
@colmsnowplow colmsnowplow force-pushed the release/1.0.0 branch 4 times, most recently from 16d494d to 08522b7 Compare July 22, 2022 16:23
Copy link

@istreeter istreeter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done everyone who contributed to this! There's loads of good stuff here.

My main two concerns (already mentioned to Colm) are:

Acking messages out-of-order. For example, acking filtered messages before acking unfiltered. In KCL this would definitely be wrong, because you would accidentally checkpoint records that hadn't been safely processed yet. I would like to understand more about how Kinsumer handles checkpointing under-the-hood and whether this is safe to do.

Graceful shutdown. I can see you call Stop() on the kinsumer client, but I've been trying to figure out what happens next in the concurrent process which is looping over incoming messages. Ideally, a stream processing app should finish writing messages to the target, then ack all outstanding messages, then exit. I think you might be OK based on reading this code block but I would like to understand this shutdown better.

cmd/cli/cli.go Outdated
Comment on lines 126 to 130
err := common.DeleteTemporaryDir()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you're deleting the temporary directory before waiting for the rest of the application to finish. You might argue that it's safe, but I think it's good principle of graceful shutdown to only start removing resources after the dependent processes have been allowed to finish.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this is true. I actually don't think we need to remove the directory at all. Really this should only happen when we're exiting, which means the instance is getting shut down anyway. What's your opinion of just removing the cleanup part altogether?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now moved to within the shutdown process, after other resources have been shut down.

cmd/init.go Outdated
Comment on lines 47 to 48
if cfg.Data.GoogleServiceAccountB64 != "" {
targetFile, err := common.GetGCPServiceAccountFromBase64(cfg.Data.GoogleServiceAccountB64)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get here that you're accepting credentials from an environment variable (or hcl file). But I'm slightly surprised that this feature has been implemented in stream replicator.

Do you know for what deployment setups it will be necessary to use this feature? Versus using the standard authentication methods for GCP.

I'm asking because if it's an unnecessary feature then I would suggest removing it before version 1.0.0, instead of supporting something we don't need. But there might be a setup I don't know about where this environment variable trick is necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actually, this is Data Fabric's preference - it's how @jbeemster initially set it up, and I never really deviated from that.

It's stored as a kubernetes secret in the GCP deployment, so should be secure.

On AWS, similar secrets are container secrets.

I think the reason behind it is that it's easier and cheaper to handle things this way than to mount a storage volume that's only used for these creds files - but I'm not the best person to ask on that front, data fabric can speak to it better than me.

I'm totally happy to consider a different model, if we think there's a better one (but I would be tempted to push back on it being part of this release, just because of how keen I am to wrestle back some release momentum).

What are the standard authentication methods you refer to/how would you have expected this to work?

Copy link
Member

@jbeemster jbeemster Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lets us map a GCP credential into AWS Lambda where you cannot mount a volume so you can write from AWS -> GCP via Lambda (which is still a supported runtime). Same in CloudFunctions if we want to overwrite the credentials for the target.

Copy link
Collaborator Author

@colmsnowplow colmsnowplow Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(which is still a supported runtime)

Actually, we're ripping that support out in this PR.

So I guess we should change this feature too...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's been done now - we just expect the service account file to be provided to the environment, and the GOOGLE_APPLICATION_CREDENTIALS env var set, as one would for any other app authenticating for GCP.

Comment on lines +18 to +20
// Create returns a pointer to an output structure given a pointer
// to an input structure. This interface is expected to be implemented
// by components that are creatable through a configuration.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it very helpful to have code comments like this on interface definitions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adatzer deserves the credit for this, and I definitely agree.

config/config.go Outdated
LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"`
GoogleServiceAccountB64 string `hcl:"google_application_credentials_b64,optional" env:"GOOGLE_APPLICATION_CREDENTIALS_B64"`
UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"`
EnableTelemetry bool `hcl:"enable_telemetry,optional" env:"ENABLE_TELEMETRY"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all other Snowplow applications, the config option is telemetry.disable (not enable). I now cannot remember why we called the option disable instead of enable. I think it's because we thought it would be weird to make a user set enable=false. Because you would guess that setting enable=false would be the same as not setting anything at all. Whereas to configure it as disable=true feels a bit more like you're turning on the feature (of disablement).

Sorry I haven't explained that very well. I will see if I can dig out the original conversation from last year.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice it but now I can't unsee it. Totally agree, will make that change no problem. :)

(reminds me of https://en.wikipedia.org/wiki/The_Design_of_Everyday_Things)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done - and I also noticed that this flag wasn't actually doing anything. So now it does actually disable telemetry too.

@@ -0,0 +1,147 @@
# Configuring stream-replicator from a file

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example configs are really neat; I like this user-friendly way of configuring the app.

In other Snowplow apps (actually just the recent ones) we have tried to be consistent across apps in how we name configuration options. For stream replicator it won't be possible to get the same consistency, because the config file structures are so different. But, I do think we should review parameter names and assess if there are any places we can be more consistent with other Snowplow apps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense - I'd rather make that kind of change now than later. I was about to try to compile a big list of all options, but then I realised I'm just duplicating all the example files by doing so. Are there any that jump out at you/ how would you suggest we identify the ones that we'd rather change?

}

// CreateTLSConfiguration creates a TLS configuration for use in a target
func CreateTLSConfiguration(crt, key, ca, destination string, skipVerify bool) (*tls.Config, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my comment about the GCP credentials env var.... it feels like this function is burdening application code with something that should be handled by the deployment fabric. For what type of deployment setups is it necessary to have this in the code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer as the other - Data Fabric requested that it be this way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really only for systems where you cannot mount a volume (like Lambda). If we exclude those runner options then I agree we can move away from this model and simply mount volumes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said in the other comment, we're ripping that support out, so I guess we should change this back to how it was before, and handle it in the terraform stack.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now expects a path to a file and it's down to the deployment to provide that file to the environment.

// Send emits the bufferred metrics to the receiver
func (s *StatsDStatsReceiver) Send(b *models.ObserverBuffer) {
func (s *statsDStatsReceiver) Send(b *models.ObserverBuffer) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this app gets deployed for real-time stream replicating, then the metric customers (and the business) will care about is latency measured by the collector timestamp. i.e. what is the total age of the event in the pipeline by the time it is written to the target.

I spoke to Colm about this already, and I get that it's missing because stream replicator was designed to be agnostic about message type, so it has no awareness of collector timestamp. For reference though, that used to be true of our S3 loader, but we ended up implementing "awareness" (here) because it was important for s3 loader to report that metric.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can see the logic here. I think it needs some consideration, and I'd argue that it can be post-v1 release. If you disagree, of course, I have an open mind.

I've created an isssue for it - perhaps we can discuss it there?

Comment on lines 79 to 92
go func() {
makeAndTrackHeartbeat(telemetry, tracker)
for {
<-ticker.C
makeAndTrackHeartbeat(telemetry, tracker)
}
}()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't figure out what happens during graceful shutdown. I would guess this for loop needs to select between the ticker and a stop channel, so that it can exit cleanly when the app gets a sig term. But I'm not a go expert so I might have got this wrong!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first glance I think you're right, we'll instrument the pattern you suggest or similar.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done now.

@colmsnowplow
Copy link
Collaborator Author

colmsnowplow commented Jul 26, 2022

@istreeter bear with me as a few of those questions require a bit of effort to answer properly. I'll pop in my answers to the easier ones first though. :)

@colmsnowplow
Copy link
Collaborator Author

Acking messages out-of-order. For example, acking filtered messages before acking unfiltered. In KCL this would definitely be wrong, because you would accidentally checkpoint records that hadn't been safely processed yet. I would like to understand more about how Kinsumer handles checkpointing under-the-hood and whether this is safe to do.

This is handled in our fork of kinsumer.

There's an in-memory checkpointer state for each shard, which keeps track of the latest checkpointed sequence number. When the checkpointers commit function is called, it updates DDB with the latest sequence number. This is called periodically on a ticker.

With 'manual checkpointing', kinsumer returns an updatefunc, which is responsible for updating the latest processed sequence number. When we do so, we don't update the sequence number until the previous one has been updated. This is what prevents us from accidentally committing past an unprocessed sequence number. It does open up the possibility of having more duplicates, but we accept this trade-off, and elsewhere we take measures to mitigate that risk.

I do think that we could do with a test which demonstrates this behaviour, either in our kinsumer fork, or in this project. I'l need to think about this a bit but I'll see what I can come up with.

@colmsnowplow
Copy link
Collaborator Author

colmsnowplow commented Jul 26, 2022

Graceful shutdown. I can see you call Stop() on the kinsumer client, but I've been trying to figure out what happens next in the concurrent process which is looping over incoming messages. Ideally, a stream processing app should finish writing messages to the target, then ack all outstanding messages, then exit. I think you might be OK based on reading this code block but I would like to understand this shutdown better.

This is in kinsumer as well - kinsumer waits for the main waitgroup when we call Stop(). This returns the Run() function, and that will trigger some deferred functions to stop all the different processes involved.

The most relevant of these is stopConsumers - which closes the k.stop channel. The effect of this is:

  1. Where we're attempting to capture shards, we immediately stop doing so
  2. For processes currently processing data, we:

The timeout in that last bullet point exists because if another consumer wants to seize ownership of the shard, we're preventing that for a period here. However, in the case where something's gone wrong in checkpointing - or checkpointing is simply taking too long (which is configurable in kinsumer but not yet in stream replicator), we do need to give up ownership.

So basically, kinsumer won't shut down until we've acked all the records, unless we take too long to ack the records in which case it will just shut itself down and we'll get duplicates.

For this one also, I'd love to add some kind of unit or integration test for it, but I'll need some time to figure that out.

@colmsnowplow
Copy link
Collaborator Author

@istreeter I've tried to explain the mechanisms involved in your two main comments - I'm not sure how well I've explained the complexity so please shout if discussing it would help!

@istreeter
Copy link

Regarding checkpointing. You explained it well, and your links were helpful. I understand now that kinsumer blocks on the checkpointing function until the previous record has also been checkpointed.

But this means that you must checkpoint every message. If you ever accidentally don't checkpoint a message, then kinsumer will end up permanently blocked. Am I right on this?

It makes me concerned about this retrying logic. If there are some failures and some successes, then you drop the successes and retry calling t.Write with the failures only. This means you never ack the dropped successes. Please tell me if I'm wrong about this, because it looks like a big problem.

More generally, it makes me nervous about the code structure with respect to acking. Currently acking is happening in multiple different places: in t.Write and in ft.WriteOversized and in ft.WriteInvalid and completely separately again for filtered messages. To be 100% sure that you are acking every single message, I would prefer the code if I could see that acking all happens in one place.

@colmsnowplow
Copy link
Collaborator Author

colmsnowplow commented Jul 26, 2022

But this means that you must checkpoint every message. If you ever accidentally don't checkpoint a message, then kinsumer will end up permanently blocked. Am I right on this?

As I understand it, basically yeah - but what happens in this scenario is that the shard owner that is blocked will give up ownership of the shard, and another will assume control from the point at which it last successfully checkpointed. So in that scenario we'd get lots of dupes rather than anything more serious like data loss.

(Edit to add:)

The important logic responsible for that is kinsumer's clientRecordMaxAge setting - if the client hasn't been able to checkpoint with DDB for longer than this period of time other clients will stop recognising it as owning the shard and will eventually claim ownership of it -the new consumer will keep trying to claim ownership until the max age has passed, after which it'll just claim the shard from the last successful checkpoint.

But that's only if we don't ack every message, which I don't think will happen unless we have some error or crash.

It makes me concerned about this retrying logic. If there are some failures and some successes, then you drop the successes and retry calling t.Write with the failures only. This means you never ack the dropped successes. Please tell me if I'm wrong about this, because it looks like a big problem.

We ack in the Write function. For kinesis target, either the whole batch succeeds and we ack, or we fail the whole batch (this isn't great would love to improve it).

For other targets, we always ack successes straight away.

So we do ignore successes at this point, but only after they've been acked.

By the way, currently the kinesis source only returns messages one by one, rather than in batches, so we don't have any possibility of hitting that scenario until we fix batching regardless.

More generally, it makes me nervous about the code structure with respect to acking. Currently acking is happening in multiple different places: in t.Write and in ft.WriteOversized and in ft.WriteInvalid and completely separately again for filtered messages. To be 100% sure that you are acking every single message, I would prefer the code if I could see that acking all happens in one place.

I take the point, we could likely improve readability of the code. I'd be curious about the latency implications of moving the acks to one place at the very end - perhaps it's minimal. I'm definitely open to better code design here though, so am interested to surface ideas.

However, I would challenge you on this point:

To be 100% sure that you are acking every single message, I would prefer the code if I could see that acking all happens in one place.

Failure targets are just targets with a wrapper on the data to construct a bad row. So we actually only ever have one thing responsible for acking - the Write function of a target. We call it here and here.

Overall, this is a design that pre-existed my involvement in the project, and I haven't had need to expend effort to redesign it to date - I'm definitely open to doing that.

To be 100% sure that you are acking every single message,

I should ask - in light of this explanation, do you think there is a more serious concern that something might be wrong here? (in which case I'm keen to flesh that out sooner), or is this a code readability issue? (in which case I agree, but perhaps it can be addressed in the next release).

@istreeter
Copy link

I should ask - in light of this explanation, do you think this is a code readability issue? (in which case I agree, but perhaps it can be addressed in the next release), or is there a more serious concern that something might be wrong here?

Before your explanation, I thought there was a more serious issue, and something was wrong. But you now satisfied me that every target acks the messages immediately after a successul write.

So is there a readability issue..... I don't know. It took me a while to untangle the combination of retrying + acking, because of how it was spread across different files. But now that I understand it, it no longer seems so bad.

So we actually only ever have one thing responsible for acking - the Write function of a target

Well... strictly speaking this line too. But I agree it's not as bad as I made out.

@colmsnowplow
Copy link
Collaborator Author

colmsnowplow commented Jul 26, 2022

OK nice, good to hear I've managed to explain it.

Well... strictly speaking this line too. But I agree it's not as bad as I made out.

Ah, yes you're right! I forgot about this - this is basically my fault for somewhat crowbarring in that part of the filtering logic, and in retrospect it does kind of conflict with the overall design. This is for sure something to think about, it's somewhat jarring design-wise now that I reflect on it through a wider lens.

@colmsnowplow
Copy link
Collaborator Author

As far as my own review goes, once the filters are fixed up (ie this PR: #185), I think I'm ready to approve.

There are quite a few things that could be better, but they don't necessarily warrant blocking release. The below follow up issues have been created:

Transform config could be better: #189
Unit tests could be simpler: #190

Filters:
Base filter should fail earlier on misconfiguration: #191
API should change to negative lookahead: #188
Remove some jank: #186

Analytics SDK-related:

Two bugs and one potential improvment have been opened on the analytics sdk repo:
Bug - snowplow/snowplow-golang-analytics-sdk#37
Bug - snowplow/snowplow-golang-analytics-sdk#36
Gjson - snowplow/snowplow-golang-analytics-sdk#38

When they get addressed, these issues are to track the changes in this project that should follow:

Fixup after bugs fixed: #187
API change if we do so in analytics SDK: #193

@colmsnowplow colmsnowplow force-pushed the release/1.0.0 branch 3 times, most recently from 5b23e9c to 649e979 Compare August 12, 2022 14:34
@colmsnowplow colmsnowplow merged commit df2008c into master Aug 15, 2022
@colmsnowplow colmsnowplow deleted the release/1.0.0 branch August 15, 2022 10:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants