Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Latest commit

 

History

History
62 lines (54 loc) · 2.44 KB

File metadata and controls

62 lines (54 loc) · 2.44 KB

Home | Logs | Amazon Kinsesis | Kinsesis Analytics | Help/Resources.

For a Detailed Description of Kinesis Analytics, please check out the page on the AWS website.

Here is the code we are using in the class:


-- ** Continuous Filter ** 
-- Performs a continuous filter based on a WHERE condition.
--          .----------.   .----------.   .----------.              
--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              
-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination
--          |          |   |  (PUMP)  |   |          |              
--          '----------'   '----------'   '----------'               
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "myanalyticsstream" (
    event_time VARCHAR(32),
    country   VARCHAR(32),
    country_alpha VARCHAR(4),
    amount_eur INTEGER);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "myanalyticsstream"
-- Select all columns from source stream
SELECT STREAM "event_time", "country", "country_alpha", "amount_eur"
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE "country" NOT IN ('HU', 'RO');

Optional:

Here is the Athena table definition command if you want to read from the Firehose Delivery stream:

CREATE EXTERNAL TABLE `ceu_test_table`(
  `event_dime` timestamp COMMENT 'from deserializer', 
  `iban_from` string COMMENT 'from deserializer', 
  `iban_to` string COMMENT 'from deserializer', 
  `country` string COMMENT 'from deserializer', 
  `country_alpha` string COMMENT 'from deserializer', 
  `amount_eur` int COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION
  's3://ceu-lambda/ceu2020'
TBLPROPERTIES (
  'has_encrypted_data'='false', 
  'transient_lastDdlTime'='1551165071')