Browse Source

Refactor: more like an app, less like a script

It should also implement auto-renewing of expired subscriptions.
master
Chimo 5 years ago
parent
commit
838acc4a08
11 changed files with 511 additions and 283 deletions
  1. +72
    -0
      classes/Event.js
  2. +286
    -0
      classes/Lifestream.js
  3. +140
    -0
      classes/Subscription.js
  4. +4
    -275
      main.js
  5. +2
    -1
      package.json
  6. +1
    -1
      sources/blog.js
  7. +1
    -1
      sources/gnufm.js
  8. +1
    -1
      sources/gnusocial.js
  9. +2
    -2
      sources/gogs.js
  10. +1
    -1
      sources/mediagoblin.js
  11. +1
    -1
      sources/pullet.js

+ 72
- 0
classes/Event.js View File

@ -0,0 +1,72 @@
( function() {
/*global console: false, module: false*/
"use strict";
var Event;
/**
* [[Description]]
* @param {[[Type]]} data [[Description]]
*/
Event = function( data ) {
var that = this;
Event.fields.forEach( function( field ) {
that[ field ] = data[ field ];
} );
};
/*
* SQL columns
*/
Event.fields = [
"id",
"subscription_id",
"title",
"content",
"published",
"updated",
"foreign_url",
"object_type",
"object_verb"
];
/**
* [[Description]]
* @param {[[Type]]} database [[Description]]
* @param {[[Type]]} callback [[Description]]
*/
Event.prototype.insert = function( database, callback ) {
var that = this;
database.getConnection( function( err, connection ) {
if ( err ) {
/* FIXME: logger.debug( "[Event::insert] Couldn't connect to SQL: " + err.stack ); */
console.log( "[Event::insert] Couldn't connect to SQL: " + err.stack );
return;
}
connection.query(
"INSERT INTO event(" +
"subscription_id, title, content, published, updated, " +
"foreign_url, object_type, object_verb " +
") VALUES( ?, ?, ?, ?, ?, ?, ?, ? );",
[
that.subscription_id,
that.title,
that.content,
that.published,
null,
that.source,
that.objectType,
that.verb
],
callback
);
connection.release();
} );
};
module.exports = Event;
}() );

+ 286
- 0
classes/Lifestream.js View File

@ -0,0 +1,286 @@
( function() {
/*global require: false, console: false, JSON: false, module: false*/
"use strict";
var Lifestream,
mysql = require( "mysql" ),
WebSocketServer = require( "ws" ).Server,
pubSubHubbub = require( "pubsubhubbub" ),
extend = require( "extend" ),
Subscription = require( "./Subscription.js" ),
/* Utils: */
format_date,
http_get,
_setTimeout;
/**
* Constructor
* @param {Object} options configurations
*/
Lifestream = function( options ) {
var that = this;
// Set config values
that.config = extend( true, Lifestream.defaults, options );
// Setup things
that.sql = that._setupSQL();
that.wss = that._setupWebsockets();
that.subscriber = that._setupSubscriber();
that.subscriptions = [];
// Subscribe to things
that.config.subs.forEach( function( sub ) {
var subscription = new Subscription( {
topic: sub.topic,
huburi: sub.hub,
type: sub.type
} );
that.subscriptions[ sub.topic ] = subscription;
that.subscribe( subscription );
} );
};
/**
* Default configuration values
*/
Lifestream.defaults = {
"callback": {
"port": 1337,
"url": "http://example.org"
},
"db": {
"host": "localhost",
"name": "lifestream",
"user": "lifestream",
"password": ""
},
"subs": [],
"websockets": {
"port": 8090
},
"log": {
"transports": [
{
"type": "console",
"level": "error",
"handleExceptions": true
},
{
"type": "file",
"level": "error",
"handleExceptions": true,
"filename": "lifestream.log"
}
]
}
};
/**
* [[Description]]
* @param {Object} subscription [[Description]]
*/
Lifestream.prototype.subscribe = function( subscription ) {
this.subscriber.subscribe( subscription.topic, subscription.huburi );
};
/**
* Setup MySQL pool connection
*
* https://www.npmjs.com/package/mysql#pooling-connections
* @returns {Object} SQL pool
*/
Lifestream.prototype._setupSQL = function() {
var db = this.config.db;
return mysql.createPool( {
host: db.host,
database: db.name,
user: db.user,
password: db.password,
charset: db.charset || "UTF8_GENERAL_CI"
} );
};
/**
* Setup websockets
*
* https://www.npmjs.com/package/ws
* @returns {Object} websocket
*/
Lifestream.prototype._setupWebsockets = function() {
var wss = new WebSocketServer( { port: this.config.websockets.port } );
wss.broadcast = function broadcast( data ) {
wss.clients.forEach( function each( client ) {
client.send( data );
} );
};
return wss;
};
/**
* After subscription
*/
Lifestream.prototype._subscribed = function( subscription, data ) {
var now = new Date(),
expires = new Date( data.lease * 1000 ), // 'lease' is in seconds, Date() needs ms
timeUntilExpiration = Math.max( expires - now, 1000 * 60 * 60 ); // FIXME: My GS lease is messed up
// Update subscriptions datetimes
subscription.modified = now;
subscription.sub_start = now;
subscription.sub_end = expires;
// Renew when expires
_setTimeout.call( subscription, subscription.renew, timeUntilExpiration, this );
// Insert/update in DB
subscription.insert( this.sql, function( err ) {
if ( err ) {
/* FIXME: logger.debug( "Error inserting subscription: " + err.stack ); */
console.log( "Error inserting subscription: " + err.stack );
return;
}
/* FIXME: logger.debug( "Inserted subscription in SQL table" ); */
console.log( "Inserted subscription in SQL table" );
} );
};
/**
* Setup subscriber
*/
Lifestream.prototype._setupSubscriber = function() {
var that = this,
callback = that.config.callback,
subscriber = pubSubHubbub.createServer( {
"callbackUrl": callback.url + ":" + callback.port
} );
subscriber.on( "error", function() {
console.log( "error" );
} );
subscriber.on( "subscribe", function( data ) {
that._subscribed( that.subscriptions[ data.topic ], data );
} );
subscriber.on( "feed", function( response ) {
var topic = response.topic;
Subscription.getKV( "topic", topic, that.sql, function( subscription ) {
if ( subscription === null ) {
/* FIXME: logger.debug( "Sub doesn't exist" ); */
console.log( "Sub doesn't exist" );
return;
}
http_get( topic, function( data ) {
var event = subscription.getEvent( data, topic );
if ( event === null ) {
/* FIXME: logger.debug( "Got a ping, but nothing to insert" );
logger.debug( "Topic: " + topic ); */
console.log( "Got a ping, but nothing to insert" );
console.log( "Topic: " + topic );
return;
}
event.subscription_id = subscription.id;
event.published = format_date( event.published );
event.insert( that.sql, function( err, result ) {
if ( err ) {
/* FIXME: logger.debug( "Error inserting event: " + err.stack ); */
console.log( "Error inserting event: " + err.stack );
return;
}
// Tell websockets what type of event this is
event.type = subscription.type;
// Tell websockets the id of the new item we inserted
event.id = result.insertId;
// Notify websockets
that.wss.broadcast( JSON.stringify( event ) );
} );
} );
} );
} );
subscriber.listen( callback.port );
return subscriber;
};
// ############# UTILS #############
// From: https://developer.mozilla.org/en-US/docs/Web/API/WindowTimers/setTimeout#A_possible_solution
// TODO: We don't need to keep original functionality since we're calling it a different name
_setTimeout = function( vCallback, nDelay ) {
var oThis = this,
aArgs = Array.prototype.slice.call( arguments, 2 );
return setTimeout( vCallback instanceof Function ? function() {
vCallback.apply( oThis, aArgs );
} : vCallback, nDelay );
};
/**
* Date string to MySQL DATETIME format
*
* @param {String} date A date to convert
* @returns {String} MySQL DATETIME format
*/
format_date = function( date ) {
var d = new Date( date );
// Invalide date, fallback to "now"
if ( isNaN( d.getTime() ) ) {
d = new Date();
}
return d.toISOString().slice( 0, -5 ).replace( "T", " " );
};
/**
* [[Description]]
* @param {String} url [[Description]]
* @param {[[Type]]} callback [[Description]]
*/
http_get = function( url, callback ) {
var req = ( url.substr( 0, 8 ) === "https://" ) ? require( "https" ) : require( "http" ),
handleResponse;
handleResponse = function( response ) {
var data = "";
response.on( "data", function( chunk ) {
data += chunk;
} );
response.on( "end", function() {
callback( data );
} );
};
req.get( url, handleResponse )
.on( "error", function( e ) {
/* FIXME: logger.debug( "Error trying to fetch topic: " + e.message );
logger.debug( "Topic: " + url ); */
console.log( "Got error trying to fetch topic: " + e.message );
console.log( "Topic: " + url );
} );
};
module.exports = Lifestream;
}() );

+ 140
- 0
classes/Subscription.js View File

@ -0,0 +1,140 @@
( function() {
/*global console: false, require: false, module: false*/
"use strict";
var Subscription,
Event = require( "./Event.js" );
/**
* [[Description]]
*/
Subscription = function( data ) {
var that = this;
Subscription.fields.forEach( function( field ) {
that[ field ] = data[ field ];
} );
that.parser = require( "../sources/" + that.type + ".js" );
};
/*
* SQL Columns
*/
Subscription.fields = [
"id",
"topic",
"type",
"huburi",
"secret",
"state",
"last_ping",
"created",
"modified",
"sub_start",
"sub_end"
];
/**
* [[Description]]
* @param {[[Type]]} data [[Description]]
* @returns {[[Type]]} [[Description]]
*/
Subscription.prototype.getEvent = function( data, topic ) {
var obj = this.parser.parse( data, topic );
return new Event( obj );
};
/**
* [[Description]]
* @param {[[Type]]} key [[Description]]
* @param {[[Type]]} value [[Description]]
* @param {[[Type]]} database [[Description]]
* @param {[[Type]]} callback [[Description]]
*/
Subscription.getKV = function( key, value, database, callback ) {
database.getConnection( function( err, connection ) {
if ( err ) {
/* FIXME: logger.debug( "[onFeed] Couldn't connect to SQL: " + err.stack ); */
console.log( "[onFeed] Couldn't connect to SQL: " + err.stack );
return;
}
connection.query(
"SELECT * FROM subscription WHERE ?? = ?",
[ key, value ],
function( err, res ) {
if ( err ) {
/* FIXME: logger.debug( "Couldn't fetch subscription from DB: " + err.stack ); */
console.log( "Couldn't fetch subscription from DB: " + err.stack );
callback( err );
return;
}
// Subscription doesn't exist
if ( res.length === 0 ) {
callback( null );
return;
}
// Build subscription
callback( new Subscription( res[ 0 ] ) );
}
);
connection.release();
} );
};
/**
* [[Description]]
* @param {[[Type]]} database [[Description]]
*/
Subscription.prototype.renew = function( database ) {
database.subscribe( this );
};
/**
* [[Description]]
* @param {[[Type]]} database [[Description]]
*/
Subscription.prototype.insert = function( database, callback ) {
var that = this;
database.getConnection( function( err, connection ) {
if ( err ) {
/* FIXME: logger.debug( "[onSubscribe] Couldn't connect to SQL: " + err.stack ); */
console.log( "[onSubscribe] Couldn't connect to SQL: " + err.stack );
return;
}
connection.query(
"INSERT INTO subscription(" +
"topic, huburi, type, secret, state, last_ping, created, " +
"modified, sub_start, sub_end " +
") VALUES( ?, ?, ?, ?, ?, ?, DEFAULT, ?, ?, ? )" +
"ON DUPLICATE KEY UPDATE modified=NOW(), sub_start=NOW(), sub_end=?;",
[
that.topic,
that.huburi,
that.type,
null,
"active",
null,
that.modified,
that.sub_start,
that.sub_end,
that.sub_end
],
callback
);
connection.release();
} );
};
module.exports = Subscription;
}() );

+ 4
- 275
main.js View File

@ -2,275 +2,10 @@
/*global require: false, JSON: false, console: false*/
"use strict";
var mysql = require( "mysql" ),
WebSocketServer = require( "ws" ).Server,
winston = require( "winston" ),
fs = require( "fs" ),
wss,
sqlPool,
setupSubscriber,
formatDate,
sources = [],
setupLogger,
logger,
var ls,
config,
setupWebSockets,
setupSQL;
setupSQL = function() {
return mysql.createPool( {
host: config.db.host,
database: config.db.name,
user: config.db.user,
password: config.db.password,
charset: config.db.charset || "UTF8_GENERAL_CI"
} );
};
setupWebSockets = function() {
wss = new WebSocketServer( { port: config.websockets.port } );
// Broadcast
wss.broadcast = function broadcast( data ) {
wss.clients.forEach( function each( client ) {
client.send( data );
} );
};
return wss;
};
setupLogger = function() {
var transports = [],
transport,
len,
i,
type,
logger;
if ( !config.log || !config.log.transports ) {
logger = {};
logger.debug = function() { };
logger.log = logger.debug;
logger.info = logger.debug;
return logger;
}
for ( i = 0, len = config.log.transports.length; i < len; i += 1 ) {
transport = config.log.transports[ i ];
type = transport.type || "Console";
type = type.charAt( 0 ).toUpperCase() + type.slice( 1 ); /* ucfirst() */
delete transport.type;
// TODO: Handle cases where call() fails due to bad "type" value
transports.push( new winston.transports[ type ]( transport ) );
}
logger = new ( winston.Logger )(
{
transports: transports,
exitOnError: false
}
);
return logger;
};
// ISO String to MySQL DATETIME format
// YYYY-MM-DDTHH:MM:SS.sssZ to YYYY-MM-DD HH:MM:SS
// TODO: convert all dates to the same timezone (UTC? EST? EDT? GMT? wat.)
formatDate = function( date ) {
var d = new Date( date );
// Invalide date, fallback to "now"
if ( isNaN( d.getTime() ) ) {
d = new Date();
}
return d.toISOString().slice( 0, -5 ).replace( "T", " " );
};
setupSubscriber = function() {
var pubSubHubbub = require( "pubsubhubbub" ),
pubSubSubscriber = pubSubHubbub.createServer( {
"callbackUrl": config.callback.url + ":" + config.callback.port
} );
pubSubSubscriber.on( "error", function( err ) {
logger.debug( "An error as occured: " );
logger.debug( err );
} );
pubSubSubscriber.on( "listen", function() {
/**
* Sub / Unsub
*/
pubSubSubscriber.on( "subscribe", function( data ) {
var now = formatDate( new Date() ),
expires = formatDate( new Date( data.lease * 1000 ) ), // 'lease' is in seconds, Date() needs ms
type = types[ data.topic ];
sqlPool.getConnection( function( err, connection ) {
if ( err ) {
logger.debug( "[onSubscribe] Couldn't connect to SQL: " + err.stack );
return;
}
connection.query(
"INSERT INTO subscription( topic, huburi, type, secret, state, last_ping, created, modified, sub_start, sub_end ) VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" +
"ON DUPLICATE KEY UPDATE modified=NOW(), sub_start=NOW(), sub_end=?;",
[ data.topic, data.hub, type, null, "active", null, now, null, now, expires, expires ],
function( err ) {
if ( err ) {
logger.debug( "Error inserting subscription: " + err.stack );
return;
}
logger.debug( "Inserted subscription in SQL table" );
}
);
connection.release();
} );
logger.debug( data.topic + " subscribed" );
} );
// Subscribe
var subs = config.subs, /* TODO: error handling */
len = subs.length,
i,
sub,
types = [];
for ( i = 0; i < len; i += 1 ) {
sub = subs[ i ];
if ( !sources[ sub.topic ] ) {
sources[ sub.topic ] = require( "./sources/" + sub.type + ".js" );
types[ sub.topic ] = sub.type;
}
pubSubSubscriber.subscribe( sub.topic, sub.hub );
}
/**
* Setup PuSH
*/
pubSubSubscriber.on( "feed", function( response ) {
var subscription,
topic = response.topic,
url = topic;
logger.debug( "feed event" );
// Make sure we're subscribed to the feed we got a ping from
sqlPool.getConnection( function( err, connection ) {
if ( err ) {
logger.debug( "[onFeed] Couldn't connect to SQL: " + err.stack );
return;
}
connection.query(
"SELECT id, topic, type, last_ping FROM subscription WHERE topic = ?",
[ topic ],
function( err, res ) {
var req = ( url.substr( 0, 8 ) === "https://" ) ? require( "https" ) : require( "http" );
if ( err ) {
logger.debug( "Couldn't fetch subscription from DB: " + err.stack );
return;
}
if ( res.length === 0 ) {
logger.debug( "Received a ping from a topic we're not subscribed to." );
logger.debug( "Topic: " + topic );
return;
}
subscription = res[ 0 ];
function handleResponse( res ) {
var data = "",
event;
res.on( "data", function( chunk ) {
data += chunk;
} );
res.on( "end", function() {
logger.debug( "got data" );
event = sources[ subscription.topic ].parse( subscription, data );
if ( event === null ) {
logger.debug( "Got a ping, but nothing to insert" );
logger.debug( "Topic: " + topic );
return;
}
/**
* Insert new event in DB
*/
var published = formatDate( event.published );
// TODO: Update date
sqlPool.getConnection( function( err, connection ) {
if ( err ) {
logger.debug( "[parseData] Couln't connect to sql: " + err.stack );
return;
}
connection.query(
"INSERT INTO event( subscription_id, title, content, published, updated, foreign_url, object_type, object_verb ) VALUES( ?, ?, ?, ?, ?, ?, ?, ? );",
[ subscription.id, event.title, event.content, published, null, event.source, event.objectType, event.verb ],
function( err, result ) {
if ( err ) {
logger.debug( "Error inserting event: " + err.stack );
return;
}
// Tell websockets what type of event this is
event.type = subscription.type;
// Tell websockets the id of the new item we inserted
event.id = result.insertId;
// Notify websockets
wss.broadcast( JSON.stringify( event ) );
}
);
connection.release();
} );
} );
}
// HTTP(S) get
req.get( url, handleResponse )
.on( "error", function( e ) {
logger.debug( "Got error trying to fetch topic: " + e.message );
logger.debug( "Topic: " + topic );
} );
}
);
connection.release();
} );
} );
} );
// Start listening for pings
pubSubSubscriber.listen( config.callback.port );
};
fs = require( "fs" ),
Lifestream = require( "./classes/Lifestream.js" );
/**
* Read config file
@ -296,12 +31,6 @@
}
}
logger = setupLogger();
wss = setupWebSockets();
ls = new Lifestream( config );
sqlPool = setupSQL();
setupSubscriber();
}() );

+ 2
- 1
package.json View File

@ -14,6 +14,7 @@
"xmldom": "~0.1.19",
"mysql": "~2.6.2",
"winston": "~1.0.0",
"cheerio": "0.19.0"
"cheerio": "0.19.0",
"extend": "~3.0.0"
}
}

+ 1
- 1
sources/blog.js View File

@ -4,7 +4,7 @@
var exports = module.exports = {};
exports.parse = function( subscription, data ) {
exports.parse = function( data ) {
var event = {},
DOMParser = require( "xmldom" ).DOMParser,
doc = new DOMParser().parseFromString( data ),


+ 1
- 1
sources/gnufm.js View File

@ -6,7 +6,7 @@
var exports = module.exports = {};
exports.parse = function( subscription, data ) {
exports.parse = function( data ) {
var event = {},
DOMParser = require( "xmldom" ).DOMParser,
doc = new DOMParser().parseFromString( data ),


+ 1
- 1
sources/gnusocial.js View File

@ -7,7 +7,7 @@
// TODO: cleanup
// TODO: error handling
exports.parse = function( subscription, data ) {
exports.parse = function( data ) {
var activityRE = /([^\/]+)$/,
DOMParser = require( "xmldom" ).DOMParser,
doc = new DOMParser().parseFromString( data ),


+ 2
- 2
sources/gogs.js View File

@ -45,14 +45,14 @@
return html + "</ul>";
};
exports.parse = function( subscription, data ) {
exports.parse = function( data, topic ) {
var event = {},
cheerio = require( "cheerio" ),
$ = cheerio.load( data, { decodeEntities: true } ),
$latest = $( ".news" ).first(),
$items = $latest.find( ".content li" ), // Commits + optional "compare" link
$lastLink = $items.last().find( "a" ), // Either the last commit or the "compare" link
urlParts = urlParser.parse( subscription.topic );
urlParts = urlParser.parse( topic );
hostname = urlParts.protocol + "//" + urlParts.host;


+ 1
- 1
sources/mediagoblin.js View File

@ -4,7 +4,7 @@
var exports = module.exports = {};
exports.parse = function( subscription, data ) {
exports.parse = function( data ) {
var DOMParser = require( "xmldom" ).DOMParser,
doc = new DOMParser().parseFromString( data ),
contentTitle,


+ 1
- 1
sources/pullet.js View File

@ -4,7 +4,7 @@
var exports = module.exports = {};
exports.parse = function( subscription, data ) {
exports.parse = function( data ) {
var event = {},
DOMParser = require( "xmldom" ).DOMParser,
doc = new DOMParser().parseFromString( data ),


Loading…
Cancel
Save