Sample Header Ad - 728x90

(Athena/Presto) Window function (lag) is not reading the previous lag row

1 vote
1 answer
3363 views
I have two tables,
and
and want to show each event individually and will thus do a insert (doing union in this case which is logically the same). I know the key for both tables and am doing a union with them while sharing the same partition key
that I'll be using for the window. I'm trying to use lag to fill up the rest of the null rows as shown. enter image description here The code for the second column looks as follows:
(invoice_nk, lag(invoice_nk, 1, '') over (partition by cm_dd order by event_at)) as invoice_nk
For some reason the third row shown in the picture is not able to interpret the previous lagged row and insert a value its row as a result. I presume it may have to do something where Athena does not actually execute the lag function recursively and takes the previous row in each partition literally and doesn't wait for the previous rows to get executed. Is there a way to grab values recursively as I'm trying to do? Thanks. Update: Here's the full query:
with
  cme as(
    with
      cm as(
        select
    --      change_log
          row_number() over (partition by id order by updated_at desc) r
        , cast(cm.invoice_id as varchar(99))                         as invoice_nk
        , cast(cm.id as varchar(99))                                 as cm_dd
        , cast(cm.created_at as timestamp)                           as cm_created_at
        , cast(cm.updated_at as timestamp)                           as cm_updated_at
        from "postgres-replica-parquet".credit_memos cm
      )
    select
      case when r = 1 then cm_created_at
           else cm_updated_at end as cm_begin_at
    , null as cmi_begin_at
    , invoice_nk
    , cm_dd
    , cm_created_at
    , cm_updated_at
    from cm
  )
, cmi as(
    with
      ci as(
        select
    --      ci.change_log
          row_number() over (partition by id order by updated_at desc) r
        , cast(ci.credit_memo_id as varchar(99)) as cm_id
        , cast(ci.created_at as timestamp)       as cmi_created_at
        , cast(ci.updated_at as timestamp)       as cmi_updated_at
        from "postgres-replica-parquet".credit_memo_items ci
      )
    select -- row 1 is the snapshot when cdc first started
      null as cm_begin_at
    , case when r = 1 then cmi_created_at
           else cmi_updated_at end as cmi_begin_at
    , null as invoice_nk
    , cm_id as cm_dd
    , null as cm_created_at
    , null as cm_updated_at
    from ci
  )
, combined as(
    with
      all as(
        select * from cme
        union select * from cmi
      )
    select
      greatest(
        coalesce(cm_begin_at, cast(date_parse('1970-01-01', '%Y-%m-%d') as timestamp))
      , coalesce(cmi_begin_at, cast(date_parse('1970-01-01', '%Y-%m-%d') as timestamp))
      ) as event_at
    , *
    from all
  )
select
  event_at
, cm_begin_at as event_credit_memo_last_at
, cmi_begin_at as event_credit_memo_item_last_at
, coalesce(invoice_nk, lag(invoice_nk, 1, '') over (partition by cm_dd order by event_at)) as invoice_nk
, cm_dd as credit_memo_dd
from combined
And a snapshot of the full result: enter image description here Rows 3 and 12 should have the invoice_nk based on the credit_memo cm is the subquery to grab the credit memos cmi is the subquery to grab the credit memo items combined is just to precompute the order by key in the window functions Note that both are pulled from a database that has the full change log (CDC). The end goal is to make one row for each event, I'm defining an event for the final table to be any time either credit memo or credit memo item undergoes a change. I would like to be able to snapshot the state of a credit memo combined with the credit memo item by windowing at the
for reporting.
Asked by Eugene Choi (11 rep)
Mar 11, 2022, 03:11 PM
Last activity: Nov 13, 2024, 02:01 PM