Browse Source

Initial commit

master
Chimo 6 years ago
commit
51a83a301c
10 changed files with 547 additions and 0 deletions
  1. +3
    -0
      .gitignore
  2. +9
    -0
      .jscsrc
  3. +14
    -0
      .jshintrc
  4. +44
    -0
      config.dist.json
  5. +286
    -0
      main.js
  6. +17
    -0
      package.json
  7. +31
    -0
      schema.sql
  8. +27
    -0
      sources/blog.js
  9. +72
    -0
      sources/gnusocial.js
  10. +44
    -0
      sources/mediagoblin.js

+ 3
- 0
.gitignore View File

@ -0,0 +1,3 @@
node_modules
config.json
*.log

+ 9
- 0
.jscsrc View File

@ -0,0 +1,9 @@
{
"preset": "jquery",
"validateLineBreaks": null,
"requireCamelCaseOrUpperCaseIdentifiers": null,
"maximumLineLength": 500,
"requireBlocksOnNewline": true,
"requireSpaceBeforeBlockStatements": true,
"disallowMixedSpacesAndTabs": true
}

+ 14
- 0
.jshintrc View File

@ -0,0 +1,14 @@
{
"boss": true,
"curly": true,
"eqeqeq": true,
"eqnull": true,
"expr": true,
"immed": true,
"noarg": true,
"undef": true,
"unused": true,
"jquery": true,
"browser": true,
"es3": true
}

+ 44
- 0
config.dist.json View File

@ -0,0 +1,44 @@
{
"port": 1337,
"db": {
"host": "localhost",
"name": "lifestream",
"user": "lifestream",
"password": ""
},
"subs": [
{
"topic": "",
"hub": "",
"type": ""
},
{
"topic": "",
"hub": "",
"type": ""
}
],
"websockets": {
"port": 8090
},
"log": {
"transports": [
{
"type": "console",
"level": "debug",
"handleExceptions": true
},
{
"type": "file",
"level": "debug",
"handleExceptions": true,
"filename": "lifestream.log"
}
]
}
}

+ 286
- 0
main.js View File

@ -0,0 +1,286 @@
( function() {
/*global require: false, JSON: false*/
"use strict";
var mysql = require( "mysql" ),
WebSocketServer = require( "ws" ).Server,
winston = require( "winston" ),
fs = require( "fs" ),
wss,
sqlPool,
setupSubscriber,
formatDate,
sources = [],
setupLogger,
logger,
config,
setupWebSockets,
setupSQL;
setupSQL = function() {
return sqlPool = mysql.createPool( {
host: config.db.host,
database: config.db.name,
user: config.db.user,
password: config.db.password
} );
};
setupWebSockets = function() {
wss = new WebSocketServer( { port: config.websockets.port } );
// Broadcast
wss.broadcast = function broadcast( data ) {
wss.clients.forEach( function each( client ) {
client.send( data );
} );
};
return wss;
};
setupLogger = function() {
var transports = [],
transport,
len,
i,
type,
logger;
if ( !config.log || !config.log.transports ) {
logger = {};
logger.debug = function() { };
logger.log = logger.debug;
logger.info = logger.debug;
return logger;
}
for ( i = 0, len = config.log.transports.length; i < len; i += 1 ) {
transport = config.log.transports[ i ];
type = transport.type || "Console";
type = type.charAt( 0 ).toUpperCase() + type.slice( 1 ); /* ucfirst() */
delete transport.type;
// TODO: Handle cases where call() fails due to bad "type" value
transports.push( new winston.transports[ type ]( transport ) );
}
logger = new ( winston.Logger )(
{
transports: transports,
exitOnError: false
}
);
return logger;
};
// ISO String to MySQL DATETIME format
// YYYY-MM-DDTHH:MM:SS.sssZ to YYYY-MM-DD HH:MM:SS
// TODO: convert all dates to the same timezone (UTC? EST? EDT? GMT? wat.)
formatDate = function( date ) {
var d = new Date( date );
// Invalide date, fallback to "now"
if ( isNaN( d.getTime() ) ) {
d = new Date();
}
return d.toISOString().slice( 0, -5 ).replace( "T", " " );
};
setupSubscriber = function() {
var http = require( "http" ),
pubSubHubbub = require( "pubsubhubbub" ),
pubSubSubscriber = pubSubHubbub.createServer( {
"callbackUrl": "http://chromic.org:1337/"/*,
"secret": "secret" // TODO: do we need this? is it used?
*/
} );
pubSubSubscriber.on( "error", function( err ) {
logger.debug( "An error as occured: " );
logger.debug( err );
} );
pubSubSubscriber.on( "listen", function() {
/**
* Sub / Unsub
*/
pubSubSubscriber.on( "subscribe", function( data ) {
var tmpDate = formatDate( new Date() );
sqlPool.getConnection( function( err, connection ) {
if ( err ) {
logger.debug( "[onSubscribe] Coulnd't connect to SQL: " + err.stack );
return;
}
connection.query(
/* TODO: should we also refresh sub_start, sub_end (?) -- does the hub see it as a "extend my subscription" deal? */
"INSERT INTO subscription( topic, huburi, type, secret, state, last_ping, created, modified, sub_start, sub_end ) VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" +
"ON DUPLICATE KEY UPDATE modified=NOW();",
[ data.topic, "FIXME: should be huburi", sub.type, null, "active", null, tmpDate, null, tmpDate, tmpDate ],
function( err ) {
if ( err ) {
logger.debug( "Error inserting subscription: " + err.stack );
return;
}
logger.debug( "Inserted subscription in SQL table" );
}
);
connection.release();
} );
logger.debug( data.topic + " subscribed" );
} );
// Subscribe
var subs = config.subs, /* TODO: error handling */
len = subs.length,
i,
sub;
for ( i = 0; i < len; i += 1 ) {
sub = subs[ i ];
if ( !sources[ sub.topic] ) {
sources[ sub.topic ] = require( "./sources/" + sub.type + ".js" );
}
pubSubSubscriber.subscribe( sub.topic, sub.hub );
}
/**
* Setup PuSH
*/
pubSubSubscriber.on( "feed", function( response ) {
var subscription,
topic = response.topic,
url = topic;
logger.debug( "feed event" );
// TODO: do we need make sure we're subscribed to this thing?
// anybody could send (fake) ping request to our callback
// is there any kind of security in the PuSH spec -- HMAC, secret, ...?
sqlPool.getConnection( function( err, connection ) {
if ( err ) {
logger.debug( "[onFeed] Couldn't connect to SQL: " + err.stack );
return;
}
connection.query(
"SELECT id, topic, last_ping FROM subscription WHERE topic = ?",
[ topic ],
function( err, res ) {
if ( err ) {
logger.debug( "Couldn't fetch subscription from DB: " + err.stack );
return;
}
if ( res.length === 0 ) {
logger.debug( "Received a ping from a topic we're not subscribed to." );
logger.debug( "Topic: " + topic );
return;
}
subscription = res[ 0 ];
// FIXME: tmp exception for GS; we parse .as instead of .atom for now
// TODO: use DOMParser like we do for the blog
if ( topic === "http://sn.chromic.org/api/statuses/user_timeline/1.atom" ) {
url = "http://sn.chromic.org/api/statuses/user_timeline/1.as";
}
// Get feed
http.get( url, function( res ) {
var data = "",
event;
res.on( "data", function( chunk ) {
data += chunk;
} );
res.on( "end", function() {
logger.debug( "got data" );
event = sources[ subscription.topic ].parse( subscription, data );
if ( event === null ) {
logger.debug( "Got a ping, but nothing to insert" );
logger.debug( "Topic: " + topic );
return;
}
/**
* Insert new event in DB
*/
var published = formatDate( event.published );
// TODO: Update date
sqlPool.getConnection( function( err, connection ) {
if ( err ) {
logger.debug( "[parseData] Couln't connect to sql: " + err.stack );
return;
}
connection.query(
"INSERT INTO event( subscription_id, title, content, published, updated, foreign_url, object_type, object_verb ) VALUES( ?, ?, ?, ?, ?, ?, ?, ? );",
[ subscription.id, event.title, event.content, published, null, event.source, event.type, event.verb ],
function( err ) {
if ( err ) {
logger.debug( "Error inserting event: " + err.stack );
return;
}
// Notify websockets
wss.broadcast( JSON.stringify( event ) );
}
);
connection.release();
} );
} );
} ).on( "error", function( e ) {
logger.debug( "Got error trying to fetch topic: " + e.message );
logger.debug( "Topic: " + topic );
} );
}
);
connection.release();
} );
} );
} );
// Start listening for pings
pubSubSubscriber.listen( config.port );
};
/**
* Read config file
* TODO: error handling
*/
config = fs.readFileSync( "./config.json" );
config = JSON.parse( config );
wss = setupWebSockets();
sqlPool = setupSQL();
logger = setupLogger();
setupSubscriber();
}() );

+ 17
- 0
package.json View File

@ -0,0 +1,17 @@
{
"name": "lifestream",
"version": "0.0.1",
"description": "Realtime lifestream",
"repository": {
"type": "git",
"url": "https://github.com/chimo/lifestream"
},
"author": "Chimo",
"license": "",
"dependencies": {
"pubsubhubbub": "~0.4.1",
"xmldom": "~0.1.19",
"mysql": "~2.6.2",
"winston": "~1.0.0"
}
}

+ 31
- 0
schema.sql View File

@ -0,0 +1,31 @@
CREATE TABLE `subscription` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`topic` text NOT NULL,
`type` varchar(191) NOT NULL,
`huburi` text NOT NULL,
`secret` text,
`state` enum("subscribe", "active", "unsubscribe", "inactive", "nohub") NOT NULL,
`last_ping` datetime, # Last time we heard from the hub
`created` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, # Date this record was created
`modified` datetime, # Date this record was modified (ex: expires, subbed, unsubbed, etc.)
`sub_start` datetime NOT NULL, # Date the (re)subscribtion started
`sub_end` datetime NOT NULL, # Date the subscribtion is set to expire
PRIMARY KEY (`id`),
UNIQUE(topic(100))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE `event` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`subscription_id` int(11) NOT NULL,
`title` text NOT NULL,
`content` text,
`published` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated` datetime DEFAULT CURRENT_TIMESTAMP,
`foreign_url` text NOT NULL,
`object_type` varchar(191) NOT NULL,
`object_verb` varchar(191) NOT NULL,
PRIMARY KEY (`id`),
FOREIGN KEY (subscription_id)
REFERENCES subscription(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

+ 27
- 0
sources/blog.js View File

@ -0,0 +1,27 @@
( function() {
/*global module: false, require: false*/
"use strict";
var exports = module.exports = {};
exports.parse = function( subscription, data ) {
var event = {},
DOMParser = require( "xmldom" ).DOMParser,
doc = new DOMParser().parseFromString( data ),
item;
item = doc.getElementsByTagName( "entry" )[ 0 ];
event.title = "Posted an article";
event.source = item.getElementsByTagName( "link" )[ 0 ].getAttribute( "href" );
event.published = item.getElementsByTagName( "published" )[0].textContent;
event.content = "<h2>" + item.getElementsByTagName( "title" )[ 0 ].textContent + "</h2>" +
item.getElementsByTagName( "summary" )[0].textContent +
"<p><a href='" + event.source + "'>read more...</a></p>";
event.type = "article";
event.verb = "post";
return event;
};
}() );

+ 72
- 0
sources/gnusocial.js View File

@ -0,0 +1,72 @@
( function() {
/*global module: false, JSON: false*/
"use strict";
var exports = module.exports = {};
exports.parse = function( subscription, data ) {
var last_ping = subscription.last_ping,
activityRE = /([^\/]+)$/,
item,
by,
event = {};
data = JSON.parse( data );
// TODO: Maybe we should get all items we don't have, not just most recent one
/* This is the first time we get data from the hub.
Only get the latest item. */
if ( last_ping === null ) {
item = data.items[0];
event.published = item.published;
/* event.updated = formatDate( item.updated ); */
event.verb = activityRE.exec( item.verb )[0];
event.type = activityRE.exec( item.object.objectType )[0];
event.source = item.url;
switch ( event.verb ) {
case "favorite":
by = /by ([^:]+)/.exec( item.content )[0];
event.title = "Favorited a " + event.type + " by " + by;
event.content = item.object.content;
event.source = item.object.url; /* Link to foreign instance (favs don't have local urls) */
break;
case "share": /* repeat */
event.title = "Repeated a " + event.type;
event.conent = item.content;
break;
case "post":
event.title = "Posted a " + event.type;
if ( event.type === "bookmark" ) {
event.content = item.content;
} else { /* note, comment */
event.content = item.object.content;
}
break;
case "join":
event.title = "Joined a " + event.type;
event.content = "<a href='" + item.object.url + "'>" +
item.object.portablecontacts_net.displayName + "</a> " +
"(" + item.object.portablecontacts_net.preferredUsername + ")<br />" +
item.object.portablecontacts_net.note;
break;
case "follow":
event.title = "Started following";
event.content = "<a href='" + item.object.url + "'>" +
"<img src='" + item.object.image.url + "' rel='avatar' alt='" + item.object.displayName + "\'s avatar'>" +
item.object.displayName +
"</a>";
break;
default:
event.title = item.title;
event.content = item.content;
break;
}
}
return event;
};
}() );

+ 44
- 0
sources/mediagoblin.js View File

@ -0,0 +1,44 @@
( function() {
/*global module: false, require: false*/
"use strict";
var exports = module.exports = {};
exports.parse = function( subscription, data ) {
var DOMParser = require( "xmldom" ).DOMParser,
doc = new DOMParser().parseFromString( data ),
contentTitle,
event = {},
item = doc.getElementsByTagName( "entry" )[ 0 ],
links = item.getElementsByTagName( "link" ),
img;
// TODO: might not be an image -- check type once it's in the feed
event.title = "Posted an image";
// FIXME: should fetch based on "type" attr, not node position
event.source = links[0].getAttribute( "href" );
img = "<img src='" + links[1].getAttribute( "href" ) + "'>";
event.published = item.getElementsByTagName( "updated" )[ 0 ].textContent;
contentTitle = "<h2>" + item.getElementsByTagName( "title" )[ 0 ].textContent + "</h2>";
// Might not be present
event.content = item.getElementsByTagName( "content" ).item( 0 );
if ( event.content ) {
event.content = event.content.textContent;
} else {
event.content = "";
}
event.content = contentTitle + event.content + "<br>" + img;
event.type = "image";
event.verb = "post";
return event;
};
}() );

Loading…
Cancel
Save