Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moka loses cache with curl #299

Closed
jchimene opened this issue Aug 8, 2023 · 9 comments
Closed

Moka loses cache with curl #299

jchimene opened this issue Aug 8, 2023 · 9 comments
Labels
question Further information is requested

Comments

@jchimene
Copy link

jchimene commented Aug 8, 2023

The cache gets lost when using crate curl.

I'm guessing it has something to do with curl's memory management when sending data.

mini-moka doesn't have this problem. However, I lose the Expiry trait. If this bug attracts any interest, I will work up a reproducer; which is a fairly significant time suck if there's no sympathy in working this issue. I'd rather spend my time working around the loss of Expiry.

@jchimene jchimene changed the title Moka looses cache with curl Moka loses cache with curl Aug 8, 2023
@jchimene
Copy link
Author

jchimene commented Aug 8, 2023

It's looking like this is a lazy_static issue. Since it's being deprecated, I've switched the cache to once_cell. Seems to be working. I'm reserving the right to re-open this once I've cleaned up the carnage.

@jchimene jchimene closed this as completed Aug 8, 2023
@jchimene
Copy link
Author

jchimene commented Aug 8, 2023

It's the implementation of expire_after() that conflicts with curl. Somehow.
[UPDATE]
Why doesn't the crate make the on-cache duration available? I have to implement my own Expiry trait since the call to builder().expire_after(ACExpiry{}) yields a cache that gets dropped somehow by the curl crate.

I expect I'll have to switch to a different cache manager now. moka is simply too opaque.

I doubt this will lead to any insight, but here's a log:

running 1 test
Entry { key: Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }, value: 100ms, is_fresh: true }
1 Mutex { data: {Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: 100ms}, poisoned: false, .. }
2 Mutex { data: {}, poisoned: false, .. }
3 Mutex { data: {}, poisoned: false, .. }
4 Mutex { data: {}, poisoned: false, .. }
5 Mutex { data: {}, poisoned: false, .. }

The cache evaporates when it runs into the wall of curl.

@jchimene jchimene reopened this Aug 8, 2023
@tatsuya6502
Copy link
Member

tatsuya6502 commented Aug 8, 2023

Hi. Thanks you for trying moka.

Looking at the log, I think you still have the Cache instance itself, but it lost the entry with key Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }. Perhaps it got expired by the Expiry? (0.1 second?)

Can you please tell us what you are trying to achieve with the Expiry? What is the expected behavior? If you need, I will write some sample code.

Also, I am not sure how curl crate is used. Are you storing something returned by curl to the cache?

@jchimene
Copy link
Author

jchimene commented Aug 9, 2023

Hi - Thanks for the reply! I've spent several days battling this issue. My apologies in advance if my replies seem curt.

Looking at the log, I think you still have the Cache instance itself, but it lost the entry with key Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }. Perhaps it got expired by the Expiry? (0.1 second?)

Yes, it's a short expiry time. It's meant to simulate real time. I pushed the TTL and it still disappears:

Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: expire after create
activity: Point("HEALTH"), current_time: 0ns activity_ttl: 11.2s expire: 11.2s
Entry { key: Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }, value: 11.2s, is_fresh: true }

Point("HEALTH"): expire after read
activity: Point("HEALTH"), current_time: 0ns activity_ttl: 11.2s current_duration: Some(49.871588ms) tti: 300ms retval: Some(49.871588ms)
1 Mutex { data: {Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: 11.2s}, poisoned: false, .. }
2 Mutex { data: {}, poisoned: false, .. }
3 Mutex { data: {}, poisoned: false, .. }
4 Mutex { data: {}, poisoned: false, .. }
5 Mutex { data: {}, poisoned: false, .. }

Can you please tell us what you are trying to achieve with the Expiry? What is the expected behavior?

I want to track 4 different activities across at least 28 days.
Activities have TTI as follows: vec![*FIVE_MIN, *TWO_DAYS, *FIVE_MIN, *TWO_DAYS,];
Activities have TTL as follows vec![*FIVE_MIN, *TWENTYEIGHT_DAYS, *FIVE_MIN, *SIX_HRS,];

The cache must also be saved across image activations. I have the serialization/deserialization working. If I mock up the curl call, the Expiry trait works as expected. I am not storing anything from the call in the cache.

Here is a snipped version of the code that produces the log:

pub fn process_incoming(
    keap_app: &KeapApp,
    twilio_msg: &TwilioMessage,
    tx_result: &Sender<QueueKind>,
) {
    let msg = twilio_msg.msg.clone().unwrap_or("".to_string());

    // Format the phone number in US 10-digit format
    let sms = &Phone::from(twilio_msg.sms.clone().unwrap_or("".to_string()).as_str());

    println!("1 {:?}", ACTIVITY_CONTEXT.cache);
    let x = keap_xmlrpc::find_by_field(<SNIP>);
    println!("3 {:?}", ACTIVITY_CONTEXT.cache);
    let y = x.map_or_else(
        |e| Err(e),
        |v| Ok(Contact::new(&v, &sms.clone().to_string())),
    );
    match y {
        Ok(contact) => {
            println!("4 {:?}", ACTIVITY_CONTEXT.cache);
            let token = msg.clone();
              if matches!(
                token.as_str(),
                "REST" | "HEALTH" | "LYMPH" | "YAY" | "TEST_PI_040"
            ) {
                manage_message(<SNIP>);
            } else {<SNIP>}
        }
      <SNIP>
    }
}

fn manage_message(
    keap_app: &KeapApp,
    activity: &Activity,
    twilio_msg: &TwilioMessage,
    contact: &Contact,
    tx_result: &Sender<QueueKind>,
) {
    println!("5 {:?}\n", ACTIVITY_CONTEXT.cache);
    if !ACTIVITY_CONTEXT.exists(activity) {
        ACTIVITY_CONTEXT.put(activity);
        ACTIVITY_CONTEXT.get(&Activity {
            sms: activity.clone().sms,
            token: TokenKind::from("START"),
        });
        // respond to command or challenge event
        return achieve_goal(
            keap_app,
            &activity.clone().token.to_string(),
            twilio_msg,
            &contact,
            tx_result,
        );
    } else {
        redo(
            tx_result,
            &twilio_msg,
            IoError::new(ErrorKind::NotFound, "message already processed".to_string()).to_string(),
        );
    }
}

Here's the current Expiry impl

impl Expiry<Activity, Duration> for ACExpiry {
    #[allow(unused_variables)]
    fn expire_after_create(
        &self,
        activity: &Activity,
        activity_ttl: &Duration,
        current_time: Instant,
    ) -> Option<Duration> {
        println!("\n{:?}: expire after create", activity);
        let mut current_time_elapsed = current_time.elapsed();
        if self.debug {
            current_time_elapsed = Duration::from_millis(
                1000 * u64::try_from(current_time_elapsed.as_millis()).unwrap_or(0),
            );
        }

        let expire = activity_ttl
            .checked_sub(current_time_elapsed)
            .unwrap_or(Duration::default());

        println!(
            "activity: {:?}, current_time: {:?} activity_ttl: {:?} expire: {:?}",
            activity.token, current_time_elapsed, activity_ttl, expire,
        );

        return match activity.token.clone() {
            TokenKind::Point(_) | TokenKind::Stop | TokenKind::Help | TokenKind::Start => {
                Some(expire)
            }
            TokenKind::Unknown => Some(Duration::default()),
        };
    }

    #[allow(unused_variables)]
    fn expire_after_read(
        &self,
        activity: &Activity,
        activity_ttl: &Duration,
        current_time: Instant,
        current_duration: Option<Duration>,
        last_modified_at: Instant,
    ) -> Option<Duration> {
        println!("\n{:?}: expire after read", activity.token);
        let mut current_time_elapsed = current_time.elapsed();
        let tti = match activity.token.clone() {
            TokenKind::Point(_) | TokenKind::Stop | TokenKind::Help | TokenKind::Start => {
                if self.debug {
                    current_time_elapsed = Duration::from_millis(
                        1000 * u64::try_from(current_time_elapsed.as_millis()).unwrap_or(0),
                    );
                }
                TTI[usize::from(&activity.token)]
            }
            TokenKind::Unknown => Duration::default(),
        };

        let retval = if current_time_elapsed > *activity_ttl {
            Some(tti)
        } else {
            current_duration
        };

        println!(
            "activity: {:?}, current_time: {:?} activity_ttl: {:?} current_duration: {:?} tti: {:?} retval: {:?}",
            activity.token,
            current_time_elapsed, activity_ttl, current_duration, tti, retval,
        );

        return retval;
    }

    #[allow(unused_variables)]
    fn expire_after_update(
        &self,
        activity: &Activity,
        activity_ttl: &Duration,
        current_time: Instant,
        current_duration: Option<Duration>,
    ) -> Option<Duration> {
        debug!("{:?}: expire after update\n", activity);
        let mut current_time_elapsed = current_time.elapsed();
        let ttl = match activity.token.clone() {
            TokenKind::Point(_) | TokenKind::Stop | TokenKind::Help | TokenKind::Start => {
                if self.debug {
                    current_time_elapsed = Duration::from_millis(
                        1000 * u64::try_from(current_time_elapsed.as_millis()).unwrap_or(0),
                    );
                }
                TTL[usize::from(&activity.token)]
            }
            TokenKind::Unknown => Duration::default(),
        };

        let retval = if current_time_elapsed > *activity_ttl {
            Some(ttl)
        } else {
            current_duration
        };

        debug!(
            "activity: {:?}, current_time: {:?} activity_ttl: {:?} current_duration: {:?} ttl: {:?} retval: {:?}\n",
            activity.token,
            current_time_elapsed, activity_ttl, current_duration, ttl, retval,
        );

        return retval;
    }
}

Here's the mocked version of the code that demonstrate Expiry works as expected in the absence of curl

pub fn process_incoming(
    keap_app: &KeapApp,
    twilio_msg: &TwilioMessage,
    tx_result: &Sender<QueueKind>,
) {
    let msg = twilio_msg.msg.clone().unwrap_or("".to_string());

    // Format the phone number in US 10-digit format
    let sms = &Phone::from(twilio_msg.sms.clone().unwrap_or("".to_string()).as_str());

    println!("1 {:?}", ACTIVITY_CONTEXT.cache);
    match Ok(Contact {
        id: 0,
        first_name: "fred".to_string(),
        sms: "elmer".to_string(),
        points: 0,
    })
    .map_or_else(|e:u32| Err(e), |v| Ok(v))
    {
        // let x = keap_xmlrpc::find_by_field(
        //     keap_app,
        //     "Phone5",
        //     &sms.to_string(),
        //     vec!["Id", "FirstName", "_Points"],
        //     None,
        //     None,
        // );
        // let y = x.map_or_else(
        // |e| Err(e),
        // |v| Ok(Contact::new(&v, &sms.clone().to_string())),
        // );
        // match y {
        Ok(contact) => {
            println!("4 {:?}", ACTIVITY_CONTEXT.cache);
            let token = msg.clone()
.
.
.

Here's the log:

Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: expire after create
activity: Point("HEALTH"), current_time: 0ns activity_ttl: 100ms expire: 100ms
Entry { key: Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }, value: 100ms, is_fresh: true }

Point("HEALTH"): expire after read
activity: Point("HEALTH"), current_time: 0ns activity_ttl: 100ms current_duration: Some(49.872244ms) tti: 300ms retval: Some(49.872244ms)
1 Mutex { data: {Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: 100ms}, poisoned: false, .. }
4 Mutex { data: {Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: 100ms}, poisoned: false, .. }
5 Mutex { data: {Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: 100ms}, poisoned: false, .. }


@tatsuya6502
Copy link
Member

tatsuya6502 commented Aug 9, 2023

Hi!

I've spent several days battling this issue.

Sorry for hearing that. I see you are struggling with translating per-activity-token TTI and TTL into the Expiry trait methods. How about the following ACExpiry example? You will only need to write the tti and ttl methods, and the rest of the code should be ready to go. (I did not test the code, so please let me know if you find any issues)

pub struct ACExpiry;

impl ACExpiry {
    fn tti(&self, activity: &Activity) -> Duration {
        // TODO: Return the TTI for the activity.token. (e.g. TWO_DAYS)
        todo!()
    }

    fn ttl(&self, activity: &Activity) -> Duration {
        // TODO: Return the TTL for the activity.token. (e.g. TWENTYEIGHT_DAYS)
        todo!()
    }
}

impl Expiry<Activity, Duration> for ACExpiry {
    fn expire_after_create(
        &self,
        activity: &Activity,
        _activity_ttl: &Duration,
        _current_time: Instant,
    ) -> Option<Duration> {
        Some(self.tti(activity))
    }

    fn expire_after_update(
        &self,
        activity: &Activity,
        _activity_ttl: &Duration,
        _current_time: Instant,
        _current_duration: Option<Duration>,
    ) -> Option<Duration> {
        Some(self.tti(activity))
    }

    fn expire_after_read(
        &self,
        activity: &Activity,
        _activity_ttl: &Duration, // I am not sure what to do with this.
        current_time: Instant,
        _current_duration: Option<Duration>,
        last_modified_at: Instant,
    ) -> Option<Duration> {
        let tti = self.tti(activity);
        let ttl = self.ttl(activity);

        // Age is the duration since this activity was created or updated.
        let age = current_time.duration_since(last_modified_at);

        if age + tti < ttl {
            // Current age + TTI is shorter than TTL, so we can extend the
            // expiry by TTI.
            Some(tti)
        } else if age < ttl {
            // Current age is shorter than TTL, so we can extend the expiry
            // by the remaining duration until TTL. (The remaining duration
            // should be shorter than TTI)
            Some(ttl - age)
        } else {
            // Already lived longer than or equal to TTL. Expire now.
            Some(Duration::default())
        }
    }
}

I am not a native English speaker and I believe the documents I wrote for moka have some places to improve. I will try to make it better over time, but if you have any questions or suggestions, please feel free to open a GH discussion, issue or PR.

@tatsuya6502
Copy link
Member

tatsuya6502 commented Aug 9, 2023

Following up my previous post.

I pushed the TTL and it still disappears:

Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: expire after create
activity: Point("HEALTH"), current_time: 0ns activity_ttl: 11.2s expire: 11.2s
Entry { key: Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }, value: 11.2s, is_fresh: true }

Point("HEALTH"): expire after read
activity: Point("HEALTH"), current_time: 0ns activity_ttl: 11.2s current_duration: Some(49.871588ms) tti: 300ms retval: Some(49.871588ms)
1 Mutex { data: {Activity { sms: US("(520) 555-1220"), token: Point("HEALTH") }: 11.2s}, poisoned: false, .. }
2 Mutex { data: {}, poisoned: false, .. }
3 Mutex { data: {}, poisoned: false, .. }

The reason the cached entry disappeared at "2 Mutex" is that ACExpiry was not implemented correctly. The retval was Some(49.871588ms), which means "the entry must expire in 49.871588ms from now".

Point("HEALTH"): expire after read
activity: Point("HEALTH"), current_time: 0ns activity_ttl: 11.2s current_duration: Some(49.871588ms) tti: 300ms retval: Some(49.871588ms)

Calling real curl would take longer than 49.871588ms, so the entry expired while calling curl. This is why the entry disappeared. ACExpiry should have returned the TTI Some(300ms) instead, which may be long enough to keep the entry after calling curl.

You should try my ACExpiry in my previous post to see if it fixes the problem.

@jchimene
Copy link
Author

jchimene commented Aug 9, 2023

Ok, thanks for the effort! I will implement and repost.

@jchimene
Copy link
Author

jchimene commented Aug 9, 2023

Thanks! Worked out of the box.
Like me, I'm sure you believe code is speech. Please consider adding this version of Expiry to the documenation. Your answer goes a long way toward explaining moka's implementation of this trait.

@tatsuya6502
Copy link
Member

I am glad to hear that you got it working!

Please consider adding this version of Expiry to the documenation. Your answer goes a long way toward explaining moka's implementation of this trait.

Sure. I will follow it up with this:

@tatsuya6502 tatsuya6502 added the question Further information is requested label Aug 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants